You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:15 UTC

[07/55] [abbrv] beam git commit: Remove Accumulators and switch to the Metrics API

Remove Accumulators and switch to the Metrics API

Fix compile after sideOutput and split refactor


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b438fa7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b438fa7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b438fa7d

Branch: refs/heads/master
Commit: b438fa7df16e5181f73b6103ac2f57430cd9e6f3
Parents: e10d578
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed Apr 19 11:22:42 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                |   6 +-
 .../beam/integration/nexmark/Monitor.java       |  77 ++--
 .../beam/integration/nexmark/NexmarkQuery.java  |  16 +-
 .../beam/integration/nexmark/NexmarkRunner.java | 129 +++++--
 .../beam/integration/nexmark/NexmarkUtils.java  | 107 +++---
 .../beam/integration/nexmark/WinningBids.java   | 102 +++---
 .../nexmark/drivers/NexmarkGoogleRunner.java    |   4 +-
 .../integration/nexmark/queries/Query0.java     |  10 +-
 .../integration/nexmark/queries/Query10.java    | 363 +++++++++----------
 .../integration/nexmark/queries/Query3.java     |  73 ++--
 .../nexmark/sources/BoundedEventSource.java     |   2 +-
 .../nexmark/sources/UnboundedEventSource.java   |   2 +-
 .../nexmark/sources/BoundedEventSourceTest.java |   2 +-
 13 files changed, 448 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 67d6117..103c18f 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -29,7 +29,6 @@
 
   <artifactId>beam-integration-java-nexmark</artifactId>
   <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
-
   <packaging>jar</packaging>
 
   <properties>
@@ -227,6 +226,11 @@
       <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
+
     <!-- Extra libraries -->
     <dependency>
       <groupId>com.google.apis</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
index 6370e41..cb4d71c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -20,54 +20,55 @@ package org.apache.beam.integration.nexmark;
 import java.io.Serializable;
 
 import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * A monitor of elements with support for later retrieving their aggregators.
+ * A monitor of elements with support for later retrieving their metrics.
  *
  * @param <T> Type of element we are monitoring.
  */
 public class Monitor<T extends KnownSize> implements Serializable {
   private class MonitorDoFn extends DoFn<T, T> {
-    public final Aggregator<Long, Long> elementCounter =
-        createAggregator(counterNamePrefix + "_elements", Sum.ofLongs());
-    public final Aggregator<Long, Long> bytesCounter =
-        createAggregator(counterNamePrefix + "_bytes", Sum.ofLongs());
-    public final Aggregator<Long, Long> startTime =
-        createAggregator(counterNamePrefix + "_startTime", Min.ofLongs());
-    public final Aggregator<Long, Long> endTime =
-        createAggregator(counterNamePrefix + "_endTime", Max.ofLongs());
-    public final Aggregator<Long, Long> startTimestamp =
-        createAggregator("startTimestamp", Min.ofLongs());
-    public final Aggregator<Long, Long> endTimestamp =
-        createAggregator("endTimestamp", Max.ofLongs());
+    final Counter elementCounter =
+      Metrics.counter(name , prefix + ".elements");
+    final Counter bytesCounter =
+      Metrics.counter(name , prefix + ".bytes");
+    final Distribution startTime =
+      Metrics.distribution(name , prefix + ".startTime");
+    final Distribution endTime =
+      Metrics.distribution(name , prefix + ".endTime");
+    final Distribution startTimestamp =
+      Metrics.distribution(name , prefix + ".startTimestamp");
+    final Distribution endTimestamp =
+      Metrics.distribution(name , prefix + ".endTimestamp");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      elementCounter.addValue(1L);
-      bytesCounter.addValue(c.element().sizeInBytes());
+      elementCounter.inc();
+      bytesCounter.inc(c.element().sizeInBytes());
       long now = System.currentTimeMillis();
-      startTime.addValue(now);
-      endTime.addValue(now);
-      startTimestamp.addValue(c.timestamp().getMillis());
-      endTimestamp.addValue(c.timestamp().getMillis());
+      startTime.update(now);
+      endTime.update(now);
+      startTimestamp.update(c.timestamp().getMillis());
+      endTimestamp.update(c.timestamp().getMillis());
       c.output(c.element());
     }
   }
 
+  public final String name;
+  public final String prefix;
   final MonitorDoFn doFn;
   final PTransform<PCollection<? extends T>, PCollection<T>> transform;
-  private String counterNamePrefix;
 
-  public Monitor(String name, String counterNamePrefix) {
-    this.counterNamePrefix = counterNamePrefix;
+  public Monitor(String name, String prefix) {
+    this.name = name;
+    this.prefix = prefix;
     doFn = new MonitorDoFn();
     transform = ParDo.of(doFn);
   }
@@ -75,28 +76,4 @@ public class Monitor<T extends KnownSize> implements Serializable {
   public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
     return transform;
   }
-
-  public Aggregator<Long, Long> getElementCounter() {
-    return doFn.elementCounter;
-  }
-
-  public Aggregator<Long, Long> getBytesCounter() {
-    return doFn.bytesCounter;
-  }
-
-  public Aggregator<Long, Long> getStartTime() {
-    return doFn.startTime;
-  }
-
-  public Aggregator<Long, Long> getEndTime() {
-    return doFn.endTime;
-  }
-
-  public Aggregator<Long, Long> getStartTimestamp() {
-    return doFn.startTimestamp;
-  }
-
-  public Aggregator<Long, Long> getEndTimestamp() {
-    return doFn.endTimestamp;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
index e1cd493..ab1c305 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.integration.nexmark;
 
-import javax.annotation.Nullable;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -206,6 +206,7 @@ public abstract class NexmarkQuery
   public final Monitor<Event> eventMonitor;
   public final Monitor<KnownSize> resultMonitor;
   public final Monitor<Event> endOfStreamMonitor;
+  protected final Counter fatalCounter;
 
   protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
     super(name);
@@ -214,23 +215,16 @@ public abstract class NexmarkQuery
       eventMonitor = new Monitor<>(name + ".Events", "event");
       resultMonitor = new Monitor<>(name + ".Results", "result");
       endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
+      fatalCounter = Metrics.counter(name , "fatal");
     } else {
       eventMonitor = null;
       resultMonitor = null;
       endOfStreamMonitor = null;
+      fatalCounter = null;
     }
   }
 
   /**
-   * Return the aggregator which counts fatal errors in this query. Return null if no such
-   * aggregator.
-   */
-  @Nullable
-  public Aggregator<Long, Long> getFatalCount() {
-    return null;
-  }
-
-  /**
    * Implement the actual query. All we know about the result is it has a known encoded size.
    */
   protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index ef5f0e2..87314ce 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -24,14 +24,13 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.integration.nexmark.io.PubsubHelper;
@@ -63,15 +62,18 @@ import org.apache.beam.integration.nexmark.queries.Query8;
 import org.apache.beam.integration.nexmark.queries.Query8Model;
 import org.apache.beam.integration.nexmark.queries.Query9;
 import org.apache.beam.integration.nexmark.queries.Query9Model;
-import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -186,38 +188,59 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
   protected abstract int maxNumWorkers();
 
   /**
-   * Return the current value for a long counter, or -1 if can't be retrieved.
+   * Return the current value for a long counter, or a default value if can't be retrieved.
+   * Note this uses only attempted metrics because some runners don't support committed metrics.
    */
-  protected long getLong(PipelineResult job, Aggregator<Long, Long> aggregator) {
+  protected long getCounterMetric(PipelineResult result, String namespace, String name,
+    long defaultValue) {
+    //TODO Ismael calc this only once
+    MetricQueryResults metrics = result.metrics().queryMetrics(
+        MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+    Iterable<MetricResult<Long>> counters = metrics.counters();
     try {
-      Collection<Long> values = job.getAggregatorValues(aggregator).getValues();
-      if (values.size() != 1) {
-        return -1;
-      }
-      return Iterables.getOnlyElement(values);
-    } catch (AggregatorRetrievalException e) {
-      return -1;
+      MetricResult<Long> metricResult = counters.iterator().next();
+      return metricResult.attempted();
+    } catch (NoSuchElementException e) {
+      //TODO Ismael
     }
+    return defaultValue;
   }
 
   /**
-   * Return the current value for a time counter, or -1 if can't be retrieved.
+   * Return the current value for a long counter, or a default value if can't be retrieved.
+   * Note this uses only attempted metrics because some runners don't support committed metrics.
    */
-  protected long getTimestamp(
-    long now, PipelineResult job, Aggregator<Long, Long> aggregator) {
+  protected long getDistributionMetric(PipelineResult result, String namespace, String name,
+      DistributionType distType, long defaultValue) {
+    MetricQueryResults metrics = result.metrics().queryMetrics(
+        MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+    Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
     try {
-      Collection<Long> values = job.getAggregatorValues(aggregator).getValues();
-      if (values.size() != 1) {
-        return -1;
-      }
-      long value = Iterables.getOnlyElement(values);
-      if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
-        return -1;
+      MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
+      if (distType.equals(DistributionType.MIN)) {
+        return distributionResult.attempted().min();
+      } else if (distType.equals(DistributionType.MAX)) {
+        return distributionResult.attempted().max();
+      } else {
+        //TODO Ismael
       }
-      return value;
-    } catch (AggregatorRetrievalException e) {
+    } catch (NoSuchElementException e) {
+      //TODO Ismael
+    }
+    return defaultValue;
+  }
+
+  private enum DistributionType {MIN, MAX}
+
+  /**
+   * Return the current value for a time counter, or -1 if can't be retrieved.
+   */
+  protected long getTimestampMetric(long now, long value) {
+    //TODO Ismael improve doc
+    if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
       return -1;
     }
+    return value;
   }
 
   /**
@@ -294,21 +317,46 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
    * Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
    */
   private NexmarkPerf currentPerf(
-      long startMsSinceEpoch, long now, PipelineResult job,
+      long startMsSinceEpoch, long now, PipelineResult result,
       List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
       Monitor<?> resultMonitor) {
     NexmarkPerf perf = new NexmarkPerf();
 
-    long numEvents = getLong(job, eventMonitor.getElementCounter());
-    long numEventBytes = getLong(job, eventMonitor.getBytesCounter());
-    long eventStart = getTimestamp(now, job, eventMonitor.getStartTime());
-    long eventEnd = getTimestamp(now, job, eventMonitor.getEndTime());
-    long numResults = getLong(job, resultMonitor.getElementCounter());
-    long numResultBytes = getLong(job, resultMonitor.getBytesCounter());
-    long resultStart = getTimestamp(now, job, resultMonitor.getStartTime());
-    long resultEnd = getTimestamp(now, job, resultMonitor.getEndTime());
-    long timestampStart = getTimestamp(now, job, resultMonitor.getStartTimestamp());
-    long timestampEnd = getTimestamp(now, job, resultMonitor.getEndTimestamp());
+    long numEvents =
+      getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1);
+    long numEventBytes =
+      getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1);
+    long eventStart =
+      getTimestampMetric(now,
+        getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime",
+          DistributionType.MIN, -1));
+    long eventEnd =
+      getTimestampMetric(now,
+        getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime",
+          DistributionType.MAX, -1));
+
+    long numResults =
+      getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1);
+    long numResultBytes =
+      getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1);
+    long resultStart =
+      getTimestampMetric(now,
+        getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime",
+          DistributionType.MIN, -1));
+    long resultEnd =
+      getTimestampMetric(now,
+        getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime",
+          DistributionType.MAX, -1));
+    long timestampStart =
+      getTimestampMetric(now,
+        getDistributionMetric(result,
+          resultMonitor.name, resultMonitor.prefix + ".startTimestamp",
+          DistributionType.MIN, -1));
+    long timestampEnd =
+      getTimestampMetric(now,
+        getDistributionMetric(result,
+          resultMonitor.name, resultMonitor.prefix + ".endTimestamp",
+          DistributionType.MAX, -1));
 
     long effectiveEnd = -1;
     if (eventEnd >= 0 && resultEnd >= 0) {
@@ -372,7 +420,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
       perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
     }
 
-    perf.jobId = getJobId(job);
+    perf.jobId = getJobId(result);
     // As soon as available, try to capture cumulative cost at this point too.
 
     NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
@@ -574,9 +622,10 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
 
       if (options.isStreaming() && !waitingForShutdown) {
         Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
-        if (query.getFatalCount() != null && getLong(job, query.getFatalCount()) > 0) {
+        long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
+        if (fatalCount > 0) {
           NexmarkUtils.console("job has fatal errors, cancelling.");
-          errors.add(String.format("Pipeline reported %s fatal errors", query.getFatalCount()));
+          errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
           waitingForShutdown = true;
         } else if (configuration.debug && configuration.numEvents > 0
                    && currPerf.numEvents == configuration.numEvents
@@ -1033,7 +1082,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
       if (c.element().hashCode() % 2 == 0) {
         c.output(c.element());
       } else {
-        c.sideOutput(SIDE, c.element());
+        c.output(SIDE, c.element());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index a47ebcc..18589c4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -53,12 +53,12 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -419,48 +419,42 @@ public class NexmarkUtils {
    */
   public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
     return ParDo.of(new DoFn<Event, Event>() {
-                  final Aggregator<Long, Long> eventCounter =
-                      createAggregator("events", Sum.ofLongs());
-                  final Aggregator<Long, Long> newPersonCounter =
-                      createAggregator("newPersons", Sum.ofLongs());
-                  final Aggregator<Long, Long> newAuctionCounter =
-                      createAggregator("newAuctions", Sum.ofLongs());
-                  final Aggregator<Long, Long> bidCounter =
-                      createAggregator("bids", Sum.ofLongs());
-                  final Aggregator<Long, Long> endOfStreamCounter =
-                      createAggregator("endOfStream", Sum.ofLongs());
-
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    eventCounter.addValue(1L);
-                    if (c.element().newPerson != null) {
-                      newPersonCounter.addValue(1L);
-                    } else if (c.element().newAuction != null) {
-                      newAuctionCounter.addValue(1L);
-                    } else if (c.element().bid != null) {
-                      bidCounter.addValue(1L);
-                    } else {
-                      endOfStreamCounter.addValue(1L);
-                    }
-                    info("%s snooping element %s", name, c.element());
-                    c.output(c.element());
-                  }
-                });
+      final Counter eventCounter = Metrics.counter(name, "events");
+      final Counter newPersonCounter = Metrics.counter(name, "newPersons");
+      final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
+      final Counter bidCounter = Metrics.counter(name, "bids");
+      final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        eventCounter.inc();
+        if (c.element().newPerson != null) {
+          newPersonCounter.inc();
+        } else if (c.element().newAuction != null) {
+          newAuctionCounter.inc();
+        } else if (c.element().bid != null) {
+          bidCounter.inc();
+        } else {
+          endOfStreamCounter.inc();
+        }
+        info("%s snooping element %s", name, c.element());
+        c.output(c.element());
+      }
+    });
   }
 
   /**
    * Return a transform to count and discard each element.
    */
-  public static <T> ParDo.SingleOutput<T, Void> devNull(String name) {
+  public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
     return ParDo.of(new DoFn<T, Void>() {
-                  final Aggregator<Long, Long> discardCounter =
-                      createAggregator("discarded", Sum.ofLongs());
+      final Counter discardedCounterMetric = Metrics.counter(name, "discarded");
 
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    discardCounter.addValue(1L);
-                  }
-                });
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        discardedCounterMetric.inc();
+      }
+    });
   }
 
   /**
@@ -468,28 +462,27 @@ public class NexmarkUtils {
    */
   public static <T> ParDo.SingleOutput<T, T> log(final String name) {
     return ParDo.of(new DoFn<T, T>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    LOG.info("%s: %s", name, c.element());
-                    c.output(c.element());
-                  }
-                });
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        LOG.info("%s: %s", name, c.element());
+        c.output(c.element());
+      }
+    });
   }
 
   /**
    * Return a transform to format each element as a string.
    */
-  public static <T> ParDo.SingleOutput<T, String> format(String name) {
+  public static <T> ParDo.SingleOutput<T, String> format(final String name) {
     return ParDo.of(new DoFn<T, String>() {
-                  final Aggregator<Long, Long> recordCounter =
-                      createAggregator("records", Sum.ofLongs());
+      final Counter recordCounterMetric = Metrics.counter(name, "records");
 
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    recordCounter.addValue(1L);
-                    c.output(c.element().toString());
-                  }
-                });
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        recordCounterMetric.inc();
+        c.output(c.element().toString());
+      }
+    });
   }
 
   /**
@@ -497,11 +490,11 @@ public class NexmarkUtils {
    */
   public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
     return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(TimestampedValue.of(c.element(), c.timestamp()));
-                  }
-                });
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        c.output(TimestampedValue.of(c.element(), c.timestamp()));
+      }
+    });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
index 9f1ddf8..f2566b8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -40,11 +40,11 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
@@ -323,56 +323,52 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
 
     // Find the highest price valid bid for each closed auction.
     return
-        // Join auctions and bids.
-        KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
-            .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
-            .apply(CoGroupByKey.<Long>create())
-
-            // Filter and select.
-            .apply(name + ".Join",
-                ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
-                      final Aggregator<Long, Long> noAuctionCounter =
-                          createAggregator("noAuction", Sum.ofLongs());
-                      final Aggregator<Long, Long> underReserveCounter =
-                          createAggregator("underReserve", Sum.ofLongs());
-                      final Aggregator<Long, Long> noValidBidsCounter =
-                          createAggregator("noValidBids", Sum.ofLongs());
-
-
-                      @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        Auction auction =
-                            c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
-                        if (auction == null) {
-                          // We have bids without a matching auction. Give up.
-                          noAuctionCounter.addValue(1L);
-                          return;
-                        }
-                        // Find the current winning bid for auction.
-                        // The earliest bid with the maximum price above the reserve wins.
-                        Bid bestBid = null;
-                        for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
-                          // Bids too late for their auction will have been
-                          // filtered out by the window merge function.
-                          checkState(bid.dateTime < auction.expires);
-                          if (bid.price < auction.reserve) {
-                            // Bid price is below auction reserve.
-                            underReserveCounter.addValue(1L);
-                            continue;
-                          }
-
-                          if (bestBid == null
-                              || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
-                            bestBid = bid;
-                          }
-                        }
-                        if (bestBid == null) {
-                          // We don't have any valid bids for auction.
-                          noValidBidsCounter.addValue(1L);
-                          return;
-                        }
-                        c.output(new AuctionBid(auction, bestBid));
-                      }
-                    }));
+      // Join auctions and bids.
+      KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
+        .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+        .apply(CoGroupByKey.<Long>create())
+        // Filter and select.
+        .apply(name + ".Join",
+          ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
+            private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
+            private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
+            private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
+
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              Auction auction =
+                  c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+              if (auction == null) {
+                // We have bids without a matching auction. Give up.
+                noAuctionCounter.inc();
+                return;
+              }
+              // Find the current winning bid for auction.
+              // The earliest bid with the maximum price above the reserve wins.
+              Bid bestBid = null;
+              for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+                // Bids too late for their auction will have been
+                // filtered out by the window merge function.
+                checkState(bid.dateTime < auction.expires);
+                if (bid.price < auction.reserve) {
+                  // Bid price is below auction reserve.
+                  underReserveCounter.inc();
+                  continue;
+                }
+
+                if (bestBid == null
+                    || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
+                  bestBid = bid;
+                }
+              }
+              if (bestBid == null) {
+                // We don't have any valid bids for auction.
+                noValidBidsCounter.inc();
+                return;
+              }
+              c.output(new AuctionBid(auction, bestBid));
+            }
+          }
+        ));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
index 7ffd47a..935bf0d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
@@ -130,7 +130,9 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl
           NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
           return;
         case RUNNING:
-          numEvents = getLong(job, publisherMonitor.getElementCounter());
+          //TODO Ismael Validate that this counter is ok
+          numEvents =
+            getCounterMetric(job, publisherMonitor.name, publisherMonitor.prefix + ".elements", -1);
           if (startMsSinceEpoch < 0 && numEvents > 0) {
             startMsSinceEpoch = System.currentTimeMillis();
             endMsSinceEpoch = startMsSinceEpoch

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
index f60d5de..84696c4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
@@ -28,10 +28,10 @@ import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
@@ -49,15 +49,15 @@ public class Query0 extends NexmarkQuery {
         // Force round trip through coder.
         .apply(name + ".Serialize",
             ParDo.of(new DoFn<Event, Event>() {
-                  private final Aggregator<Long, Long> bytes =
-                      createAggregator("bytes", Sum.ofLongs());
+                  private final Counter bytesMetric =
+                    Metrics.counter(name , "bytes");
 
                   @ProcessElement
                   public void processElement(ProcessContext c) throws CoderException, IOException {
                     ByteArrayOutputStream outStream = new ByteArrayOutputStream();
                     coder.encode(c.element(), outStream, Coder.Context.OUTER);
                     byte[] byteArray = outStream.toByteArray();
-                    bytes.addValue((long) byteArray.length);
+                    bytesMetric.inc((long) byteArray.length);
                     ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
                     Event event = coder.decode(inStream, Coder.Context.OUTER);
                     c.output(event);

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index 5246427..d9b3557 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -34,12 +34,12 @@ import org.apache.beam.integration.nexmark.model.Done;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterEach;
 import org.apache.beam.sdk.transforms.windowing.AfterFirst;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -184,196 +184,189 @@ public class Query10 extends NexmarkQuery {
   private PCollection<Done> applyTyped(PCollection<Event> events) {
     final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
 
-    return events.apply(name + ".ShardEvents",
-            ParDo.of(new DoFn<Event, KV<String, Event>>() {
-                      final Aggregator<Long, Long> lateCounter =
-                          createAggregator("actuallyLateEvent", Sum.ofLongs());
-                      final Aggregator<Long, Long> onTimeCounter =
-                          createAggregator("actuallyOnTimeEvent", Sum.ofLongs());
+    return events
+      .apply(name + ".ShardEvents",
+        ParDo.of(new DoFn<Event, KV<String, Event>>() {
+          private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent");
+          private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter");
 
-                      @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        if (c.element().hasAnnotation("LATE")) {
-                          lateCounter.addValue(1L);
-                          LOG.error("Observed late: %s", c.element());
-                        } else {
-                          onTimeCounter.addValue(1L);
-                        }
-                        int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
-                        String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
-                        c.output(KV.of(shard, c.element()));
-                      }
-                    }))
-        .apply(name + ".WindowEvents",
-                Window.<KV<String, Event>>into(
-            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
-            .triggering(AfterEach.inOrder(
-                Repeatedly
-                    .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
-                    .orFinally(AfterWatermark.pastEndOfWindow()),
-                Repeatedly.forever(
-                    AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
-                        AfterProcessingTime.pastFirstElementInPane()
-                                           .plusDelayOf(LATE_BATCHING_PERIOD)))))
-            .discardingFiredPanes()
-            // Use a 1 day allowed lateness so that any forgotten hold will stall the
-            // pipeline for that period and be very noticeable.
-            .withAllowedLateness(Duration.standardDays(1)))
-        .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
-        .apply(name + ".CheckForLateEvents",
-            ParDo.of(new DoFn<KV<String, Iterable<Event>>,
-                     KV<String, Iterable<Event>>>() {
-                   final Aggregator<Long, Long> earlyCounter =
-                       createAggregator("earlyShard", Sum.ofLongs());
-                   final Aggregator<Long, Long> onTimeCounter =
-                       createAggregator("onTimeShard", Sum.ofLongs());
-                   final Aggregator<Long, Long> lateCounter =
-                       createAggregator("lateShard", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedLatePaneCounter =
-                       createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
-                       createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs());
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            if (c.element().hasAnnotation("LATE")) {
+              lateCounter.inc();
+              LOG.error("Observed late: %s", c.element());
+            } else {
+              onTimeCounter.inc();
+            }
+            int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+            String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+            c.output(KV.of(shard, c.element()));
+          }
+        }))
+      .apply(name + ".WindowEvents",
+        Window.<KV<String, Event>>into(
+          FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+          .triggering(AfterEach.inOrder(
+              Repeatedly
+                  .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+                  .orFinally(AfterWatermark.pastEndOfWindow()),
+              Repeatedly.forever(
+                  AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+                      AfterProcessingTime.pastFirstElementInPane()
+                                         .plusDelayOf(LATE_BATCHING_PERIOD)))))
+          .discardingFiredPanes()
+          // Use a 1 day allowed lateness so that any forgotten hold will stall the
+          // pipeline for that period and be very noticeable.
+          .withAllowedLateness(Duration.standardDays(1)))
+      .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
+      .apply(name + ".CheckForLateEvents",
+        ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+                 KV<String, Iterable<Event>>>() {
+          private final Counter earlyCounter = Metrics.counter(name , "earlyShard");
+          private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard");
+          private final Counter lateCounter = Metrics.counter(name , "lateShard");
+          private final Counter unexpectedLatePaneCounter =
+            Metrics.counter(name , "ERROR_unexpectedLatePane");
+          private final Counter unexpectedOnTimeElementCounter =
+            Metrics.counter(name , "ERROR_unexpectedOnTimeElement");
 
-                   @ProcessElement
-                   public void processElement(ProcessContext c, BoundedWindow window) {
-                     int numLate = 0;
-                     int numOnTime = 0;
-                     for (Event event : c.element().getValue()) {
-                       if (event.hasAnnotation("LATE")) {
-                         numLate++;
-                       } else {
-                         numOnTime++;
-                       }
-                     }
-                     String shard = c.element().getKey();
-                     LOG.error(
-                         "%s with timestamp %s has %d actually late and %d on-time "
-                             + "elements in pane %s for window %s",
-                         shard, c.timestamp(), numLate, numOnTime, c.pane(),
-                         window.maxTimestamp());
-                     if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
-                       if (numLate == 0) {
-                         LOG.error(
-                             "ERROR! No late events in late pane for %s", shard);
-                         unexpectedLatePaneCounter.addValue(1L);
-                       }
-                       if (numOnTime > 0) {
-                         LOG.error(
-                             "ERROR! Have %d on-time events in late pane for %s",
-                             numOnTime, shard);
-                         unexpectedOnTimeElementCounter.addValue(1L);
-                       }
-                       lateCounter.addValue(1L);
-                     } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
-                       if (numOnTime + numLate < configuration.maxLogEvents) {
-                         LOG.error(
-                             "ERROR! Only have %d events in early pane for %s",
-                             numOnTime + numLate, shard);
-                       }
-                       earlyCounter.addValue(1L);
-                     } else {
-                       onTimeCounter.addValue(1L);
-                     }
-                     c.output(c.element());
-                   }
-                 }))
-        .apply(name + ".UploadEvents",
-            ParDo.of(new DoFn<KV<String, Iterable<Event>>,
-                     KV<Void, OutputFile>>() {
-                   final Aggregator<Long, Long> savedFileCounter =
-                       createAggregator("savedFile", Sum.ofLongs());
-                   final Aggregator<Long, Long> writtenRecordsCounter =
-                       createAggregator("writtenRecords", Sum.ofLongs());
+          @ProcessElement
+          public void processElement(ProcessContext c, BoundedWindow window) {
+            int numLate = 0;
+            int numOnTime = 0;
+            for (Event event : c.element().getValue()) {
+              if (event.hasAnnotation("LATE")) {
+                numLate++;
+              } else {
+                numOnTime++;
+              }
+            }
+            String shard = c.element().getKey();
+            LOG.error(
+                "%s with timestamp %s has %d actually late and %d on-time "
+                    + "elements in pane %s for window %s",
+                shard, c.timestamp(), numLate, numOnTime, c.pane(),
+                window.maxTimestamp());
+            if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+              if (numLate == 0) {
+                LOG.error(
+                    "ERROR! No late events in late pane for %s", shard);
+                unexpectedLatePaneCounter.inc();
+              }
+              if (numOnTime > 0) {
+                LOG.error(
+                    "ERROR! Have %d on-time events in late pane for %s",
+                    numOnTime, shard);
+                unexpectedOnTimeElementCounter.inc();
+              }
+              lateCounter.inc();
+            } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+              if (numOnTime + numLate < configuration.maxLogEvents) {
+                LOG.error(
+                    "ERROR! Only have %d events in early pane for %s",
+                    numOnTime + numLate, shard);
+              }
+              earlyCounter.inc();
+            } else {
+              onTimeCounter.inc();
+            }
+            c.output(c.element());
+          }
+        }))
+      .apply(name + ".UploadEvents",
+        ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+                 KV<Void, OutputFile>>() {
+          private final Counter savedFileCounter = Metrics.counter(name , "savedFile");
+          private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords");
 
-                   @ProcessElement
-                   public void processElement(ProcessContext c, BoundedWindow window)
-                           throws IOException {
-                     String shard = c.element().getKey();
-                     GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
-                     OutputFile outputFile = outputFileFor(window, shard, c.pane());
-                     LOG.error(
-                         "Writing %s with record timestamp %s, window timestamp %s, pane %s",
-                         shard, c.timestamp(), window.maxTimestamp(), c.pane());
-                     if (outputFile.filename != null) {
-                       LOG.error("Beginning write to '%s'", outputFile.filename);
-                       int n = 0;
-                       try (OutputStream output =
-                                Channels.newOutputStream(openWritableGcsFile(options, outputFile
-                                    .filename))) {
-                         for (Event event : c.element().getValue()) {
-                           Event.CODER.encode(event, output, Coder.Context.OUTER);
-                           writtenRecordsCounter.addValue(1L);
-                           if (++n % 10000 == 0) {
-                             LOG.error("So far written %d records to '%s'", n,
-                                 outputFile.filename);
-                           }
-                         }
-                       }
-                       LOG.error("Written all %d records to '%s'", n, outputFile.filename);
-                     }
-                     savedFileCounter.addValue(1L);
-                     c.output(KV.<Void, OutputFile>of(null, outputFile));
-                   }
-                 }))
-        // Clear fancy triggering from above.
-        .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
-            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
-            .triggering(AfterWatermark.pastEndOfWindow())
-            // We expect no late data here, but we'll assume the worst so we can detect any.
-            .withAllowedLateness(Duration.standardDays(1))
-            .discardingFiredPanes())
+            @ProcessElement
+            public void processElement(ProcessContext c, BoundedWindow window)
+                    throws IOException {
+              String shard = c.element().getKey();
+              GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+              OutputFile outputFile = outputFileFor(window, shard, c.pane());
+              LOG.error(
+                  "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+                  shard, c.timestamp(), window.maxTimestamp(), c.pane());
+              if (outputFile.filename != null) {
+                LOG.error("Beginning write to '%s'", outputFile.filename);
+                int n = 0;
+                try (OutputStream output =
+                         Channels.newOutputStream(openWritableGcsFile(options, outputFile
+                             .filename))) {
+                  for (Event event : c.element().getValue()) {
+                    Event.CODER.encode(event, output, Coder.Context.OUTER);
+                    writtenRecordsCounter.inc();
+                    if (++n % 10000 == 0) {
+                      LOG.error("So far written %d records to '%s'", n,
+                          outputFile.filename);
+                    }
+                  }
+                }
+                LOG.error("Written all %d records to '%s'", n, outputFile.filename);
+              }
+              savedFileCounter.inc();
+              c.output(KV.<Void, OutputFile>of(null, outputFile));
+            }
+          }))
+      // Clear fancy triggering from above.
+      .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
+        FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+        .triggering(AfterWatermark.pastEndOfWindow())
+        // We expect no late data here, but we'll assume the worst so we can detect any.
+        .withAllowedLateness(Duration.standardDays(1))
+        .discardingFiredPanes())
       // this GroupByKey allows to have one file per window
       .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
-        .apply(name + ".Index",
-            ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
-                   final Aggregator<Long, Long> unexpectedLateCounter =
-                       createAggregator("ERROR_unexpectedLate", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedEarlyCounter =
-                       createAggregator("ERROR_unexpectedEarly", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedIndexCounter =
-                       createAggregator("ERROR_unexpectedIndex", Sum.ofLongs());
-                   final Aggregator<Long, Long> finalizedCounter =
-                       createAggregator("indexed", Sum.ofLongs());
+      .apply(name + ".Index",
+        ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
+          private final Counter unexpectedLateCounter =
+            Metrics.counter(name , "ERROR_unexpectedLate");
+          private final Counter unexpectedEarlyCounter =
+              Metrics.counter(name , "ERROR_unexpectedEarly");
+          private final Counter unexpectedIndexCounter =
+              Metrics.counter(name , "ERROR_unexpectedIndex");
+          private final Counter finalizedCounter = Metrics.counter(name , "indexed");
 
-                   @ProcessElement
-                   public void processElement(ProcessContext c, BoundedWindow window)
-                           throws IOException {
-                     if (c.pane().getTiming() == Timing.LATE) {
-                       unexpectedLateCounter.addValue(1L);
-                       LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
-                     } else if (c.pane().getTiming() == Timing.EARLY) {
-                       unexpectedEarlyCounter.addValue(1L);
-                       LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
-                     } else if (c.pane().getTiming() == Timing.ON_TIME
-                         && c.pane().getIndex() != 0) {
-                       unexpectedIndexCounter.addValue(1L);
-                       LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
-                     } else {
-                       GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
-                       LOG.error(
-                           "Index with record timestamp %s, window timestamp %s, pane %s",
-                           c.timestamp(), window.maxTimestamp(), c.pane());
+          @ProcessElement
+          public void processElement(ProcessContext c, BoundedWindow window)
+                  throws IOException {
+            if (c.pane().getTiming() == Timing.LATE) {
+              unexpectedLateCounter.inc();
+              LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
+            } else if (c.pane().getTiming() == Timing.EARLY) {
+              unexpectedEarlyCounter.inc();
+              LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+            } else if (c.pane().getTiming() == Timing.ON_TIME
+                && c.pane().getIndex() != 0) {
+              unexpectedIndexCounter.inc();
+              LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+            } else {
+              GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+              LOG.error(
+                  "Index with record timestamp %s, window timestamp %s, pane %s",
+                  c.timestamp(), window.maxTimestamp(), c.pane());
 
-                       @Nullable String filename = indexPathFor(window);
-                       if (filename != null) {
-                         LOG.error("Beginning write to '%s'", filename);
-                         int n = 0;
-                         try (OutputStream output =
-                                  Channels.newOutputStream(
-                                      openWritableGcsFile(options, filename))) {
-                           for (OutputFile outputFile : c.element().getValue()) {
-                             output.write(outputFile.toString().getBytes());
-                             n++;
-                           }
-                         }
-                         LOG.error("Written all %d lines to '%s'", n, filename);
-                       }
-                       c.output(
-                           new Done("written for timestamp " + window.maxTimestamp()));
-                       finalizedCounter.addValue(1L);
-                     }
-                   }
-                 }));
+              @Nullable String filename = indexPathFor(window);
+              if (filename != null) {
+                LOG.error("Beginning write to '%s'", filename);
+                int n = 0;
+                try (OutputStream output =
+                         Channels.newOutputStream(
+                             openWritableGcsFile(options, filename))) {
+                  for (OutputFile outputFile : c.element().getValue()) {
+                    output.write(outputFile.toString().getBytes());
+                    n++;
+                  }
+                }
+                LOG.error("Written all %d lines to '%s'", n, filename);
+              }
+              c.output(
+                  new Done("written for timestamp " + window.maxTimestamp()));
+              finalizedCounter.inc();
+            }
+          }
+        }));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index ba31e9f..12b16f1 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -20,7 +20,6 @@ package org.apache.beam.integration.nexmark.queries;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
 import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
@@ -30,12 +29,12 @@ import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.integration.nexmark.model.NameCityStateId;
 import org.apache.beam.integration.nexmark.model.Person;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
@@ -81,14 +80,7 @@ public class Query3 extends NexmarkQuery {
 
   public Query3(NexmarkConfiguration configuration) {
     super(configuration, "Query3");
-    joinDoFn = new JoinDoFn(configuration.maxAuctionsWaitingTime);
-
-  }
-
-  @Override
-  @Nullable
-  public Aggregator<Long, Long> getFatalCount() {
-    return joinDoFn.fatalCounter;
+    joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime);
   }
 
   private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
@@ -195,8 +187,6 @@ public class Query3 extends NexmarkQuery {
 
     private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
 
-    public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
-
     @StateId(AUCTIONS)
     private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec =
         StateSpecs.value(ListCoder.of(Auction.CODER));
@@ -204,19 +194,25 @@ public class Query3 extends NexmarkQuery {
     @TimerId(PERSON_STATE_EXPIRING)
     private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
-    private final Aggregator<Long, Long> newAuctionCounter =
-        createAggregator("newAuction", Sum.ofLongs());
-    private final Aggregator<Long, Long> newPersonCounter =
-        createAggregator("newPerson", Sum.ofLongs());
-    private final Aggregator<Long, Long> newNewOutputCounter =
-        createAggregator("newNewOutput", Sum.ofLongs());
-    private final Aggregator<Long, Long> newOldOutputCounter =
-        createAggregator("newOldOutput", Sum.ofLongs());
-    private final Aggregator<Long, Long> oldNewOutputCounter =
-        createAggregator("oldNewOutput", Sum.ofLongs());
+    // Used to refer the metrics namespace
+    private final String name;
 
-    private JoinDoFn(int maxAuctionsWaitingTime) {
+    private final Counter newAuctionCounter;
+    private final Counter newPersonCounter;
+    private final Counter newNewOutputCounter;
+    private final Counter newOldOutputCounter;
+    private final Counter oldNewOutputCounter;
+    private final Counter fatalCounter;
+
+    private JoinDoFn(String name, int maxAuctionsWaitingTime) {
+      this.name = name;
       this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+      newAuctionCounter = Metrics.counter(name, "newAuction");
+      newPersonCounter = Metrics.counter(name, "newPerson");
+      newNewOutputCounter = Metrics.counter(name, "newNewOutput");
+      newOldOutputCounter = Metrics.counter(name, "newOldOutput");
+      oldNewOutputCounter = Metrics.counter(name, "oldNewOutput");
+      fatalCounter = Metrics.counter(name , "fatal");
     }
 
     @ProcessElement
@@ -232,14 +228,13 @@ public class Query3 extends NexmarkQuery {
       // we need to wait for the pending ReduceFn API.
 
       Person existingPerson = personState.read();
-
       if (existingPerson != null) {
         // We've already seen the new person event for this person id.
         // We can join with any new auctions on-the-fly without needing any
         // additional persistent state.
         for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
-          newAuctionCounter.addValue(1L);
-          newOldOutputCounter.addValue(1L);
+          newAuctionCounter.inc();
+          newOldOutputCounter.inc();
           c.output(KV.of(newAuction, existingPerson));
         }
         return;
@@ -255,24 +250,24 @@ public class Query3 extends NexmarkQuery {
           } else {
             LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson);
           }
-          fatalCounter.addValue(1L);
+          fatalCounter.inc();
           continue;
         }
-        newPersonCounter.addValue(1L);
+        newPersonCounter.inc();
         // We've now seen the person for this person id so can flush any
         // pending auctions for the same seller id (an auction is done by only one seller).
         List<Auction> pendingAuctions = auctionsState.read();
         if (pendingAuctions != null) {
           for (Auction pendingAuction : pendingAuctions) {
-            oldNewOutputCounter.addValue(1L);
+            oldNewOutputCounter.inc();
             c.output(KV.of(pendingAuction, newPerson));
           }
           auctionsState.clear();
         }
         // Also deal with any new auctions.
         for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
-          newAuctionCounter.addValue(1L);
-          newNewOutputCounter.addValue(1L);
+          newAuctionCounter.inc();
+          newNewOutputCounter.inc();
           c.output(KV.of(newAuction, newPerson));
         }
         // Remember this person for any future auctions.
@@ -293,17 +288,17 @@ public class Query3 extends NexmarkQuery {
         pendingAuctions = new ArrayList<>();
       }
       for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
-        newAuctionCounter.addValue(1L);
+        newAuctionCounter.inc();
         pendingAuctions.add(newAuction);
       }
       auctionsState.write(pendingAuctions);
     }
-  @OnTimer(PERSON_STATE_EXPIRING)
-  public void onTimerCallback(
-      OnTimerContext context,
-      @StateId(PERSON) ValueState<Person> personState) {
-      personState.clear();
-  }
 
+    @OnTimer(PERSON_STATE_EXPIRING)
+    public void onTimerCallback(
+        OnTimerContext context,
+        @StateId(PERSON) ValueState<Person> personState) {
+        personState.clear();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
index be74151..43d6690 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
@@ -156,7 +156,7 @@ public class BoundedEventSource extends BoundedSource<Event> {
   }
 
   @Override
-  public List<BoundedEventSource> splitIntoBundles(
+  public List<BoundedEventSource> split(
       long desiredBundleSizeBytes, PipelineOptions options) {
     NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
     List<BoundedEventSource> results = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
index 286c576..c3c6eb0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
@@ -289,7 +289,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
   }
 
   @Override
-  public List<UnboundedEventSource> generateInitialSplits(
+  public List<UnboundedEventSource> split(
       int desiredNumSplits, PipelineOptions options) {
     LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators);
     List<UnboundedEventSource> results = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
index 3f85bab..c5d7725 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
@@ -66,6 +66,6 @@ public class BoundedEventSourceTest {
     long n = 200L;
     BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
     SourceTestUtils.assertSourcesEqualReferenceSource(
-        source, source.splitIntoBundles(10, options), options);
+        source, source.split(10, options), options);
   }
 }