You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:15 UTC
[07/55] [abbrv] beam git commit: Remove Accumulators and switch to
the Metrics API
Remove Accumulators and switch to the Metrics API
Fix compile after sideOutput and split refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b438fa7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b438fa7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b438fa7d
Branch: refs/heads/master
Commit: b438fa7df16e5181f73b6103ac2f57430cd9e6f3
Parents: e10d578
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed Apr 19 11:22:42 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/pom.xml | 6 +-
.../beam/integration/nexmark/Monitor.java | 77 ++--
.../beam/integration/nexmark/NexmarkQuery.java | 16 +-
.../beam/integration/nexmark/NexmarkRunner.java | 129 +++++--
.../beam/integration/nexmark/NexmarkUtils.java | 107 +++---
.../beam/integration/nexmark/WinningBids.java | 102 +++---
.../nexmark/drivers/NexmarkGoogleRunner.java | 4 +-
.../integration/nexmark/queries/Query0.java | 10 +-
.../integration/nexmark/queries/Query10.java | 363 +++++++++----------
.../integration/nexmark/queries/Query3.java | 73 ++--
.../nexmark/sources/BoundedEventSource.java | 2 +-
.../nexmark/sources/UnboundedEventSource.java | 2 +-
.../nexmark/sources/BoundedEventSourceTest.java | 2 +-
13 files changed, 448 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 67d6117..103c18f 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -29,7 +29,6 @@
<artifactId>beam-integration-java-nexmark</artifactId>
<name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
-
<packaging>jar</packaging>
<properties>
@@ -227,6 +226,11 @@
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+ </dependency>
+
<!-- Extra libraries -->
<dependency>
<groupId>com.google.apis</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
index 6370e41..cb4d71c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
@@ -20,54 +20,55 @@ package org.apache.beam.integration.nexmark;
import java.io.Serializable;
import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
/**
- * A monitor of elements with support for later retrieving their aggregators.
+ * A monitor of elements with support for later retrieving their metrics.
*
* @param <T> Type of element we are monitoring.
*/
public class Monitor<T extends KnownSize> implements Serializable {
private class MonitorDoFn extends DoFn<T, T> {
- public final Aggregator<Long, Long> elementCounter =
- createAggregator(counterNamePrefix + "_elements", Sum.ofLongs());
- public final Aggregator<Long, Long> bytesCounter =
- createAggregator(counterNamePrefix + "_bytes", Sum.ofLongs());
- public final Aggregator<Long, Long> startTime =
- createAggregator(counterNamePrefix + "_startTime", Min.ofLongs());
- public final Aggregator<Long, Long> endTime =
- createAggregator(counterNamePrefix + "_endTime", Max.ofLongs());
- public final Aggregator<Long, Long> startTimestamp =
- createAggregator("startTimestamp", Min.ofLongs());
- public final Aggregator<Long, Long> endTimestamp =
- createAggregator("endTimestamp", Max.ofLongs());
+ final Counter elementCounter =
+ Metrics.counter(name , prefix + ".elements");
+ final Counter bytesCounter =
+ Metrics.counter(name , prefix + ".bytes");
+ final Distribution startTime =
+ Metrics.distribution(name , prefix + ".startTime");
+ final Distribution endTime =
+ Metrics.distribution(name , prefix + ".endTime");
+ final Distribution startTimestamp =
+ Metrics.distribution(name , prefix + ".startTimestamp");
+ final Distribution endTimestamp =
+ Metrics.distribution(name , prefix + ".endTimestamp");
@ProcessElement
public void processElement(ProcessContext c) {
- elementCounter.addValue(1L);
- bytesCounter.addValue(c.element().sizeInBytes());
+ elementCounter.inc();
+ bytesCounter.inc(c.element().sizeInBytes());
long now = System.currentTimeMillis();
- startTime.addValue(now);
- endTime.addValue(now);
- startTimestamp.addValue(c.timestamp().getMillis());
- endTimestamp.addValue(c.timestamp().getMillis());
+ startTime.update(now);
+ endTime.update(now);
+ startTimestamp.update(c.timestamp().getMillis());
+ endTimestamp.update(c.timestamp().getMillis());
c.output(c.element());
}
}
+ public final String name;
+ public final String prefix;
final MonitorDoFn doFn;
final PTransform<PCollection<? extends T>, PCollection<T>> transform;
- private String counterNamePrefix;
- public Monitor(String name, String counterNamePrefix) {
- this.counterNamePrefix = counterNamePrefix;
+ public Monitor(String name, String prefix) {
+ this.name = name;
+ this.prefix = prefix;
doFn = new MonitorDoFn();
transform = ParDo.of(doFn);
}
@@ -75,28 +76,4 @@ public class Monitor<T extends KnownSize> implements Serializable {
public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
return transform;
}
-
- public Aggregator<Long, Long> getElementCounter() {
- return doFn.elementCounter;
- }
-
- public Aggregator<Long, Long> getBytesCounter() {
- return doFn.bytesCounter;
- }
-
- public Aggregator<Long, Long> getStartTime() {
- return doFn.startTime;
- }
-
- public Aggregator<Long, Long> getEndTime() {
- return doFn.endTime;
- }
-
- public Aggregator<Long, Long> getStartTimestamp() {
- return doFn.startTimestamp;
- }
-
- public Aggregator<Long, Long> getEndTimestamp() {
- return doFn.endTimestamp;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
index e1cd493..ab1c305 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.integration.nexmark;
-import javax.annotation.Nullable;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
@@ -206,6 +206,7 @@ public abstract class NexmarkQuery
public final Monitor<Event> eventMonitor;
public final Monitor<KnownSize> resultMonitor;
public final Monitor<Event> endOfStreamMonitor;
+ protected final Counter fatalCounter;
protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
super(name);
@@ -214,23 +215,16 @@ public abstract class NexmarkQuery
eventMonitor = new Monitor<>(name + ".Events", "event");
resultMonitor = new Monitor<>(name + ".Results", "result");
endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
+ fatalCounter = Metrics.counter(name , "fatal");
} else {
eventMonitor = null;
resultMonitor = null;
endOfStreamMonitor = null;
+ fatalCounter = null;
}
}
/**
- * Return the aggregator which counts fatal errors in this query. Return null if no such
- * aggregator.
- */
- @Nullable
- public Aggregator<Long, Long> getFatalCount() {
- return null;
- }
-
- /**
* Implement the actual query. All we know about the result is it has a known encoded size.
*/
protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index ef5f0e2..87314ce 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -24,14 +24,13 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.integration.nexmark.io.PubsubHelper;
@@ -63,15 +62,18 @@ import org.apache.beam.integration.nexmark.queries.Query8;
import org.apache.beam.integration.nexmark.queries.Query8Model;
import org.apache.beam.integration.nexmark.queries.Query9;
import org.apache.beam.integration.nexmark.queries.Query9Model;
-import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -186,38 +188,59 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
protected abstract int maxNumWorkers();
/**
- * Return the current value for a long counter, or -1 if can't be retrieved.
+ * Return the current value for a long counter, or a default value if can't be retrieved.
+ * Note this uses only attempted metrics because some runners don't support committed metrics.
*/
- protected long getLong(PipelineResult job, Aggregator<Long, Long> aggregator) {
+ protected long getCounterMetric(PipelineResult result, String namespace, String name,
+ long defaultValue) {
+ //TODO Ismael calc this only once
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+ Iterable<MetricResult<Long>> counters = metrics.counters();
try {
- Collection<Long> values = job.getAggregatorValues(aggregator).getValues();
- if (values.size() != 1) {
- return -1;
- }
- return Iterables.getOnlyElement(values);
- } catch (AggregatorRetrievalException e) {
- return -1;
+ MetricResult<Long> metricResult = counters.iterator().next();
+ return metricResult.attempted();
+ } catch (NoSuchElementException e) {
+ //TODO Ismael
}
+ return defaultValue;
}
/**
- * Return the current value for a time counter, or -1 if can't be retrieved.
+ * Return the current value for a long counter, or a default value if can't be retrieved.
+ * Note this uses only attempted metrics because some runners don't support committed metrics.
*/
- protected long getTimestamp(
- long now, PipelineResult job, Aggregator<Long, Long> aggregator) {
+ protected long getDistributionMetric(PipelineResult result, String namespace, String name,
+ DistributionType distType, long defaultValue) {
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
+ Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
try {
- Collection<Long> values = job.getAggregatorValues(aggregator).getValues();
- if (values.size() != 1) {
- return -1;
- }
- long value = Iterables.getOnlyElement(values);
- if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
- return -1;
+ MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
+ if (distType.equals(DistributionType.MIN)) {
+ return distributionResult.attempted().min();
+ } else if (distType.equals(DistributionType.MAX)) {
+ return distributionResult.attempted().max();
+ } else {
+ //TODO Ismael
}
- return value;
- } catch (AggregatorRetrievalException e) {
+ } catch (NoSuchElementException e) {
+ //TODO Ismael
+ }
+ return defaultValue;
+ }
+
+ private enum DistributionType {MIN, MAX}
+
+ /**
+ * Return the current value for a time counter, or -1 if can't be retrieved.
+ */
+ protected long getTimestampMetric(long now, long value) {
+ //TODO Ismael improve doc
+ if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
return -1;
}
+ return value;
}
/**
@@ -294,21 +317,46 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
* Return the current performance given {@code eventMonitor} and {@code resultMonitor}.
*/
private NexmarkPerf currentPerf(
- long startMsSinceEpoch, long now, PipelineResult job,
+ long startMsSinceEpoch, long now, PipelineResult result,
List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor,
Monitor<?> resultMonitor) {
NexmarkPerf perf = new NexmarkPerf();
- long numEvents = getLong(job, eventMonitor.getElementCounter());
- long numEventBytes = getLong(job, eventMonitor.getBytesCounter());
- long eventStart = getTimestamp(now, job, eventMonitor.getStartTime());
- long eventEnd = getTimestamp(now, job, eventMonitor.getEndTime());
- long numResults = getLong(job, resultMonitor.getElementCounter());
- long numResultBytes = getLong(job, resultMonitor.getBytesCounter());
- long resultStart = getTimestamp(now, job, resultMonitor.getStartTime());
- long resultEnd = getTimestamp(now, job, resultMonitor.getEndTime());
- long timestampStart = getTimestamp(now, job, resultMonitor.getStartTimestamp());
- long timestampEnd = getTimestamp(now, job, resultMonitor.getEndTimestamp());
+ long numEvents =
+ getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1);
+ long numEventBytes =
+ getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1);
+ long eventStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime",
+ DistributionType.MIN, -1));
+ long eventEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime",
+ DistributionType.MAX, -1));
+
+ long numResults =
+ getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1);
+ long numResultBytes =
+ getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1);
+ long resultStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime",
+ DistributionType.MIN, -1));
+ long resultEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime",
+ DistributionType.MAX, -1));
+ long timestampStart =
+ getTimestampMetric(now,
+ getDistributionMetric(result,
+ resultMonitor.name, resultMonitor.prefix + ".startTimestamp",
+ DistributionType.MIN, -1));
+ long timestampEnd =
+ getTimestampMetric(now,
+ getDistributionMetric(result,
+ resultMonitor.name, resultMonitor.prefix + ".endTimestamp",
+ DistributionType.MAX, -1));
long effectiveEnd = -1;
if (eventEnd >= 0 && resultEnd >= 0) {
@@ -372,7 +420,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
}
- perf.jobId = getJobId(job);
+ perf.jobId = getJobId(result);
// As soon as available, try to capture cumulative cost at this point too.
NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
@@ -574,9 +622,10 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
if (options.isStreaming() && !waitingForShutdown) {
Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
- if (query.getFatalCount() != null && getLong(job, query.getFatalCount()) > 0) {
+ long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
+ if (fatalCount > 0) {
NexmarkUtils.console("job has fatal errors, cancelling.");
- errors.add(String.format("Pipeline reported %s fatal errors", query.getFatalCount()));
+ errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
waitingForShutdown = true;
} else if (configuration.debug && configuration.numEvents > 0
&& currPerf.numEvents == configuration.numEvents
@@ -1033,7 +1082,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
if (c.element().hashCode() % 2 == 0) {
c.output(c.element());
} else {
- c.sideOutput(SIDE, c.element());
+ c.output(SIDE, c.element());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index a47ebcc..18589c4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -53,12 +53,12 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -419,48 +419,42 @@ public class NexmarkUtils {
*/
public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
return ParDo.of(new DoFn<Event, Event>() {
- final Aggregator<Long, Long> eventCounter =
- createAggregator("events", Sum.ofLongs());
- final Aggregator<Long, Long> newPersonCounter =
- createAggregator("newPersons", Sum.ofLongs());
- final Aggregator<Long, Long> newAuctionCounter =
- createAggregator("newAuctions", Sum.ofLongs());
- final Aggregator<Long, Long> bidCounter =
- createAggregator("bids", Sum.ofLongs());
- final Aggregator<Long, Long> endOfStreamCounter =
- createAggregator("endOfStream", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- eventCounter.addValue(1L);
- if (c.element().newPerson != null) {
- newPersonCounter.addValue(1L);
- } else if (c.element().newAuction != null) {
- newAuctionCounter.addValue(1L);
- } else if (c.element().bid != null) {
- bidCounter.addValue(1L);
- } else {
- endOfStreamCounter.addValue(1L);
- }
- info("%s snooping element %s", name, c.element());
- c.output(c.element());
- }
- });
+ final Counter eventCounter = Metrics.counter(name, "events");
+ final Counter newPersonCounter = Metrics.counter(name, "newPersons");
+ final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
+ final Counter bidCounter = Metrics.counter(name, "bids");
+ final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ eventCounter.inc();
+ if (c.element().newPerson != null) {
+ newPersonCounter.inc();
+ } else if (c.element().newAuction != null) {
+ newAuctionCounter.inc();
+ } else if (c.element().bid != null) {
+ bidCounter.inc();
+ } else {
+ endOfStreamCounter.inc();
+ }
+ info("%s snooping element %s", name, c.element());
+ c.output(c.element());
+ }
+ });
}
/**
* Return a transform to count and discard each element.
*/
- public static <T> ParDo.SingleOutput<T, Void> devNull(String name) {
+ public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
return ParDo.of(new DoFn<T, Void>() {
- final Aggregator<Long, Long> discardCounter =
- createAggregator("discarded", Sum.ofLongs());
+ final Counter discardedCounterMetric = Metrics.counter(name, "discarded");
- @ProcessElement
- public void processElement(ProcessContext c) {
- discardCounter.addValue(1L);
- }
- });
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ discardedCounterMetric.inc();
+ }
+ });
}
/**
@@ -468,28 +462,27 @@ public class NexmarkUtils {
*/
public static <T> ParDo.SingleOutput<T, T> log(final String name) {
return ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- LOG.info("%s: %s", name, c.element());
- c.output(c.element());
- }
- });
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info("%s: %s", name, c.element());
+ c.output(c.element());
+ }
+ });
}
/**
* Return a transform to format each element as a string.
*/
- public static <T> ParDo.SingleOutput<T, String> format(String name) {
+ public static <T> ParDo.SingleOutput<T, String> format(final String name) {
return ParDo.of(new DoFn<T, String>() {
- final Aggregator<Long, Long> recordCounter =
- createAggregator("records", Sum.ofLongs());
+ final Counter recordCounterMetric = Metrics.counter(name, "records");
- @ProcessElement
- public void processElement(ProcessContext c) {
- recordCounter.addValue(1L);
- c.output(c.element().toString());
- }
- });
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ recordCounterMetric.inc();
+ c.output(c.element().toString());
+ }
+ });
}
/**
@@ -497,11 +490,11 @@ public class NexmarkUtils {
*/
public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(TimestampedValue.of(c.element(), c.timestamp()));
- }
- });
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(TimestampedValue.of(c.element(), c.timestamp()));
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
index 9f1ddf8..f2566b8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -40,11 +40,11 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
@@ -323,56 +323,52 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
// Find the highest price valid bid for each closed auction.
return
- // Join auctions and bids.
- KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
- .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
- .apply(CoGroupByKey.<Long>create())
-
- // Filter and select.
- .apply(name + ".Join",
- ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
- final Aggregator<Long, Long> noAuctionCounter =
- createAggregator("noAuction", Sum.ofLongs());
- final Aggregator<Long, Long> underReserveCounter =
- createAggregator("underReserve", Sum.ofLongs());
- final Aggregator<Long, Long> noValidBidsCounter =
- createAggregator("noValidBids", Sum.ofLongs());
-
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction =
- c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
- if (auction == null) {
- // We have bids without a matching auction. Give up.
- noAuctionCounter.addValue(1L);
- return;
- }
- // Find the current winning bid for auction.
- // The earliest bid with the maximum price above the reserve wins.
- Bid bestBid = null;
- for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
- // Bids too late for their auction will have been
- // filtered out by the window merge function.
- checkState(bid.dateTime < auction.expires);
- if (bid.price < auction.reserve) {
- // Bid price is below auction reserve.
- underReserveCounter.addValue(1L);
- continue;
- }
-
- if (bestBid == null
- || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
- bestBid = bid;
- }
- }
- if (bestBid == null) {
- // We don't have any valid bids for auction.
- noValidBidsCounter.addValue(1L);
- return;
- }
- c.output(new AuctionBid(auction, bestBid));
- }
- }));
+ // Join auctions and bids.
+ KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
+ .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+ .apply(CoGroupByKey.<Long>create())
+ // Filter and select.
+ .apply(name + ".Join",
+ ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
+ private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
+ private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
+ private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Auction auction =
+ c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+ if (auction == null) {
+ // We have bids without a matching auction. Give up.
+ noAuctionCounter.inc();
+ return;
+ }
+ // Find the current winning bid for auction.
+ // The earliest bid with the maximum price above the reserve wins.
+ Bid bestBid = null;
+ for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+ // Bids too late for their auction will have been
+ // filtered out by the window merge function.
+ checkState(bid.dateTime < auction.expires);
+ if (bid.price < auction.reserve) {
+ // Bid price is below auction reserve.
+ underReserveCounter.inc();
+ continue;
+ }
+
+ if (bestBid == null
+ || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
+ bestBid = bid;
+ }
+ }
+ if (bestBid == null) {
+ // We don't have any valid bids for auction.
+ noValidBidsCounter.inc();
+ return;
+ }
+ c.output(new AuctionBid(auction, bestBid));
+ }
+ }
+ ));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
index 7ffd47a..935bf0d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
@@ -130,7 +130,9 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl
NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
return;
case RUNNING:
- numEvents = getLong(job, publisherMonitor.getElementCounter());
+ //TODO Ismael Validate that this counter is ok
+ numEvents =
+ getCounterMetric(job, publisherMonitor.name, publisherMonitor.prefix + ".elements", -1);
if (startMsSinceEpoch < 0 && numEvents > 0) {
startMsSinceEpoch = System.currentTimeMillis();
endMsSinceEpoch = startMsSinceEpoch
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
index f60d5de..84696c4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
@@ -28,10 +28,10 @@ import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
/**
@@ -49,15 +49,15 @@ public class Query0 extends NexmarkQuery {
// Force round trip through coder.
.apply(name + ".Serialize",
ParDo.of(new DoFn<Event, Event>() {
- private final Aggregator<Long, Long> bytes =
- createAggregator("bytes", Sum.ofLongs());
+ private final Counter bytesMetric =
+ Metrics.counter(name , "bytes");
@ProcessElement
public void processElement(ProcessContext c) throws CoderException, IOException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(c.element(), outStream, Coder.Context.OUTER);
byte[] byteArray = outStream.toByteArray();
- bytes.addValue((long) byteArray.length);
+ bytesMetric.inc((long) byteArray.length);
ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
Event event = coder.decode(inStream, Coder.Context.OUTER);
c.output(event);
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index 5246427..d9b3557 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -34,12 +34,12 @@ import org.apache.beam.integration.nexmark.model.Done;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -184,196 +184,189 @@ public class Query10 extends NexmarkQuery {
private PCollection<Done> applyTyped(PCollection<Event> events) {
final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
- return events.apply(name + ".ShardEvents",
- ParDo.of(new DoFn<Event, KV<String, Event>>() {
- final Aggregator<Long, Long> lateCounter =
- createAggregator("actuallyLateEvent", Sum.ofLongs());
- final Aggregator<Long, Long> onTimeCounter =
- createAggregator("actuallyOnTimeEvent", Sum.ofLongs());
+ return events
+ .apply(name + ".ShardEvents",
+ ParDo.of(new DoFn<Event, KV<String, Event>>() {
+ private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent");
+ private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter");
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().hasAnnotation("LATE")) {
- lateCounter.addValue(1L);
- LOG.error("Observed late: %s", c.element());
- } else {
- onTimeCounter.addValue(1L);
- }
- int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
- String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
- c.output(KV.of(shard, c.element()));
- }
- }))
- .apply(name + ".WindowEvents",
- Window.<KV<String, Event>>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(AfterEach.inOrder(
- Repeatedly
- .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(LATE_BATCHING_PERIOD)))))
- .discardingFiredPanes()
- // Use a 1 day allowed lateness so that any forgotten hold will stall the
- // pipeline for that period and be very noticeable.
- .withAllowedLateness(Duration.standardDays(1)))
- .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
- .apply(name + ".CheckForLateEvents",
- ParDo.of(new DoFn<KV<String, Iterable<Event>>,
- KV<String, Iterable<Event>>>() {
- final Aggregator<Long, Long> earlyCounter =
- createAggregator("earlyShard", Sum.ofLongs());
- final Aggregator<Long, Long> onTimeCounter =
- createAggregator("onTimeShard", Sum.ofLongs());
- final Aggregator<Long, Long> lateCounter =
- createAggregator("lateShard", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedLatePaneCounter =
- createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
- createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs());
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (c.element().hasAnnotation("LATE")) {
+ lateCounter.inc();
+ LOG.error("Observed late: %s", c.element());
+ } else {
+ onTimeCounter.inc();
+ }
+ int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+ String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+ c.output(KV.of(shard, c.element()));
+ }
+ }))
+ .apply(name + ".WindowEvents",
+ Window.<KV<String, Event>>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+ .triggering(AfterEach.inOrder(
+ Repeatedly
+ .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(LATE_BATCHING_PERIOD)))))
+ .discardingFiredPanes()
+ // Use a 1 day allowed lateness so that any forgotten hold will stall the
+ // pipeline for that period and be very noticeable.
+ .withAllowedLateness(Duration.standardDays(1)))
+ .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
+ .apply(name + ".CheckForLateEvents",
+ ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+ KV<String, Iterable<Event>>>() {
+ private final Counter earlyCounter = Metrics.counter(name , "earlyShard");
+ private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard");
+ private final Counter lateCounter = Metrics.counter(name , "lateShard");
+ private final Counter unexpectedLatePaneCounter =
+ Metrics.counter(name , "ERROR_unexpectedLatePane");
+ private final Counter unexpectedOnTimeElementCounter =
+ Metrics.counter(name , "ERROR_unexpectedOnTimeElement");
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- int numLate = 0;
- int numOnTime = 0;
- for (Event event : c.element().getValue()) {
- if (event.hasAnnotation("LATE")) {
- numLate++;
- } else {
- numOnTime++;
- }
- }
- String shard = c.element().getKey();
- LOG.error(
- "%s with timestamp %s has %d actually late and %d on-time "
- + "elements in pane %s for window %s",
- shard, c.timestamp(), numLate, numOnTime, c.pane(),
- window.maxTimestamp());
- if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
- if (numLate == 0) {
- LOG.error(
- "ERROR! No late events in late pane for %s", shard);
- unexpectedLatePaneCounter.addValue(1L);
- }
- if (numOnTime > 0) {
- LOG.error(
- "ERROR! Have %d on-time events in late pane for %s",
- numOnTime, shard);
- unexpectedOnTimeElementCounter.addValue(1L);
- }
- lateCounter.addValue(1L);
- } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
- if (numOnTime + numLate < configuration.maxLogEvents) {
- LOG.error(
- "ERROR! Only have %d events in early pane for %s",
- numOnTime + numLate, shard);
- }
- earlyCounter.addValue(1L);
- } else {
- onTimeCounter.addValue(1L);
- }
- c.output(c.element());
- }
- }))
- .apply(name + ".UploadEvents",
- ParDo.of(new DoFn<KV<String, Iterable<Event>>,
- KV<Void, OutputFile>>() {
- final Aggregator<Long, Long> savedFileCounter =
- createAggregator("savedFile", Sum.ofLongs());
- final Aggregator<Long, Long> writtenRecordsCounter =
- createAggregator("writtenRecords", Sum.ofLongs());
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ int numLate = 0;
+ int numOnTime = 0;
+ for (Event event : c.element().getValue()) {
+ if (event.hasAnnotation("LATE")) {
+ numLate++;
+ } else {
+ numOnTime++;
+ }
+ }
+ String shard = c.element().getKey();
+ LOG.error(
+ "%s with timestamp %s has %d actually late and %d on-time "
+ + "elements in pane %s for window %s",
+ shard, c.timestamp(), numLate, numOnTime, c.pane(),
+ window.maxTimestamp());
+ if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+ if (numLate == 0) {
+ LOG.error(
+ "ERROR! No late events in late pane for %s", shard);
+ unexpectedLatePaneCounter.inc();
+ }
+ if (numOnTime > 0) {
+ LOG.error(
+ "ERROR! Have %d on-time events in late pane for %s",
+ numOnTime, shard);
+ unexpectedOnTimeElementCounter.inc();
+ }
+ lateCounter.inc();
+ } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+ if (numOnTime + numLate < configuration.maxLogEvents) {
+ LOG.error(
+ "ERROR! Only have %d events in early pane for %s",
+ numOnTime + numLate, shard);
+ }
+ earlyCounter.inc();
+ } else {
+ onTimeCounter.inc();
+ }
+ c.output(c.element());
+ }
+ }))
+ .apply(name + ".UploadEvents",
+ ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+ KV<Void, OutputFile>>() {
+ private final Counter savedFileCounter = Metrics.counter(name , "savedFile");
+ private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords");
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window)
- throws IOException {
- String shard = c.element().getKey();
- GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- OutputFile outputFile = outputFileFor(window, shard, c.pane());
- LOG.error(
- "Writing %s with record timestamp %s, window timestamp %s, pane %s",
- shard, c.timestamp(), window.maxTimestamp(), c.pane());
- if (outputFile.filename != null) {
- LOG.error("Beginning write to '%s'", outputFile.filename);
- int n = 0;
- try (OutputStream output =
- Channels.newOutputStream(openWritableGcsFile(options, outputFile
- .filename))) {
- for (Event event : c.element().getValue()) {
- Event.CODER.encode(event, output, Coder.Context.OUTER);
- writtenRecordsCounter.addValue(1L);
- if (++n % 10000 == 0) {
- LOG.error("So far written %d records to '%s'", n,
- outputFile.filename);
- }
- }
- }
- LOG.error("Written all %d records to '%s'", n, outputFile.filename);
- }
- savedFileCounter.addValue(1L);
- c.output(KV.<Void, OutputFile>of(null, outputFile));
- }
- }))
- // Clear fancy triggering from above.
- .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(AfterWatermark.pastEndOfWindow())
- // We expect no late data here, but we'll assume the worst so we can detect any.
- .withAllowedLateness(Duration.standardDays(1))
- .discardingFiredPanes())
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window)
+ throws IOException {
+ String shard = c.element().getKey();
+ GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+ OutputFile outputFile = outputFileFor(window, shard, c.pane());
+ LOG.error(
+ "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+ shard, c.timestamp(), window.maxTimestamp(), c.pane());
+ if (outputFile.filename != null) {
+ LOG.error("Beginning write to '%s'", outputFile.filename);
+ int n = 0;
+ try (OutputStream output =
+ Channels.newOutputStream(openWritableGcsFile(options, outputFile
+ .filename))) {
+ for (Event event : c.element().getValue()) {
+ Event.CODER.encode(event, output, Coder.Context.OUTER);
+ writtenRecordsCounter.inc();
+ if (++n % 10000 == 0) {
+ LOG.error("So far written %d records to '%s'", n,
+ outputFile.filename);
+ }
+ }
+ }
+ LOG.error("Written all %d records to '%s'", n, outputFile.filename);
+ }
+ savedFileCounter.inc();
+ c.output(KV.<Void, OutputFile>of(null, outputFile));
+ }
+ }))
+ // Clear fancy triggering from above.
+ .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ // We expect no late data here, but we'll assume the worst so we can detect any.
+ .withAllowedLateness(Duration.standardDays(1))
+ .discardingFiredPanes())
// this GroupByKey allows to have one file per window
.apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
- .apply(name + ".Index",
- ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
- final Aggregator<Long, Long> unexpectedLateCounter =
- createAggregator("ERROR_unexpectedLate", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedEarlyCounter =
- createAggregator("ERROR_unexpectedEarly", Sum.ofLongs());
- final Aggregator<Long, Long> unexpectedIndexCounter =
- createAggregator("ERROR_unexpectedIndex", Sum.ofLongs());
- final Aggregator<Long, Long> finalizedCounter =
- createAggregator("indexed", Sum.ofLongs());
+ .apply(name + ".Index",
+ ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
+ private final Counter unexpectedLateCounter =
+ Metrics.counter(name , "ERROR_unexpectedLate");
+ private final Counter unexpectedEarlyCounter =
+ Metrics.counter(name , "ERROR_unexpectedEarly");
+ private final Counter unexpectedIndexCounter =
+ Metrics.counter(name , "ERROR_unexpectedIndex");
+ private final Counter finalizedCounter = Metrics.counter(name , "indexed");
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window)
- throws IOException {
- if (c.pane().getTiming() == Timing.LATE) {
- unexpectedLateCounter.addValue(1L);
- LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
- } else if (c.pane().getTiming() == Timing.EARLY) {
- unexpectedEarlyCounter.addValue(1L);
- LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
- } else if (c.pane().getTiming() == Timing.ON_TIME
- && c.pane().getIndex() != 0) {
- unexpectedIndexCounter.addValue(1L);
- LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
- } else {
- GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- LOG.error(
- "Index with record timestamp %s, window timestamp %s, pane %s",
- c.timestamp(), window.maxTimestamp(), c.pane());
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window)
+ throws IOException {
+ if (c.pane().getTiming() == Timing.LATE) {
+ unexpectedLateCounter.inc();
+ LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
+ } else if (c.pane().getTiming() == Timing.EARLY) {
+ unexpectedEarlyCounter.inc();
+ LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+ } else if (c.pane().getTiming() == Timing.ON_TIME
+ && c.pane().getIndex() != 0) {
+ unexpectedIndexCounter.inc();
+ LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+ } else {
+ GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+ LOG.error(
+ "Index with record timestamp %s, window timestamp %s, pane %s",
+ c.timestamp(), window.maxTimestamp(), c.pane());
- @Nullable String filename = indexPathFor(window);
- if (filename != null) {
- LOG.error("Beginning write to '%s'", filename);
- int n = 0;
- try (OutputStream output =
- Channels.newOutputStream(
- openWritableGcsFile(options, filename))) {
- for (OutputFile outputFile : c.element().getValue()) {
- output.write(outputFile.toString().getBytes());
- n++;
- }
- }
- LOG.error("Written all %d lines to '%s'", n, filename);
- }
- c.output(
- new Done("written for timestamp " + window.maxTimestamp()));
- finalizedCounter.addValue(1L);
- }
- }
- }));
+ @Nullable String filename = indexPathFor(window);
+ if (filename != null) {
+ LOG.error("Beginning write to '%s'", filename);
+ int n = 0;
+ try (OutputStream output =
+ Channels.newOutputStream(
+ openWritableGcsFile(options, filename))) {
+ for (OutputFile outputFile : c.element().getValue()) {
+ output.write(outputFile.toString().getBytes());
+ n++;
+ }
+ }
+ LOG.error("Written all %d lines to '%s'", n, filename);
+ }
+ c.output(
+ new Done("written for timestamp " + window.maxTimestamp()));
+ finalizedCounter.inc();
+ }
+ }
+ }));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index ba31e9f..12b16f1 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -20,7 +20,6 @@ package org.apache.beam.integration.nexmark.queries;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
@@ -30,12 +29,12 @@ import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.integration.nexmark.model.NameCityStateId;
import org.apache.beam.integration.nexmark.model.Person;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
@@ -81,14 +80,7 @@ public class Query3 extends NexmarkQuery {
public Query3(NexmarkConfiguration configuration) {
super(configuration, "Query3");
- joinDoFn = new JoinDoFn(configuration.maxAuctionsWaitingTime);
-
- }
-
- @Override
- @Nullable
- public Aggregator<Long, Long> getFatalCount() {
- return joinDoFn.fatalCounter;
+ joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime);
}
private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
@@ -195,8 +187,6 @@ public class Query3 extends NexmarkQuery {
private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
- public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
-
@StateId(AUCTIONS)
private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec =
StateSpecs.value(ListCoder.of(Auction.CODER));
@@ -204,19 +194,25 @@ public class Query3 extends NexmarkQuery {
@TimerId(PERSON_STATE_EXPIRING)
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
- private final Aggregator<Long, Long> newAuctionCounter =
- createAggregator("newAuction", Sum.ofLongs());
- private final Aggregator<Long, Long> newPersonCounter =
- createAggregator("newPerson", Sum.ofLongs());
- private final Aggregator<Long, Long> newNewOutputCounter =
- createAggregator("newNewOutput", Sum.ofLongs());
- private final Aggregator<Long, Long> newOldOutputCounter =
- createAggregator("newOldOutput", Sum.ofLongs());
- private final Aggregator<Long, Long> oldNewOutputCounter =
- createAggregator("oldNewOutput", Sum.ofLongs());
+ // Used to refer the metrics namespace
+ private final String name;
- private JoinDoFn(int maxAuctionsWaitingTime) {
+ private final Counter newAuctionCounter;
+ private final Counter newPersonCounter;
+ private final Counter newNewOutputCounter;
+ private final Counter newOldOutputCounter;
+ private final Counter oldNewOutputCounter;
+ private final Counter fatalCounter;
+
+ private JoinDoFn(String name, int maxAuctionsWaitingTime) {
+ this.name = name;
this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+ newAuctionCounter = Metrics.counter(name, "newAuction");
+ newPersonCounter = Metrics.counter(name, "newPerson");
+ newNewOutputCounter = Metrics.counter(name, "newNewOutput");
+ newOldOutputCounter = Metrics.counter(name, "newOldOutput");
+ oldNewOutputCounter = Metrics.counter(name, "oldNewOutput");
+ fatalCounter = Metrics.counter(name , "fatal");
}
@ProcessElement
@@ -232,14 +228,13 @@ public class Query3 extends NexmarkQuery {
// we need to wait for the pending ReduceFn API.
Person existingPerson = personState.read();
-
if (existingPerson != null) {
// We've already seen the new person event for this person id.
// We can join with any new auctions on-the-fly without needing any
// additional persistent state.
for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.addValue(1L);
- newOldOutputCounter.addValue(1L);
+ newAuctionCounter.inc();
+ newOldOutputCounter.inc();
c.output(KV.of(newAuction, existingPerson));
}
return;
@@ -255,24 +250,24 @@ public class Query3 extends NexmarkQuery {
} else {
LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson);
}
- fatalCounter.addValue(1L);
+ fatalCounter.inc();
continue;
}
- newPersonCounter.addValue(1L);
+ newPersonCounter.inc();
// We've now seen the person for this person id so can flush any
// pending auctions for the same seller id (an auction is done by only one seller).
List<Auction> pendingAuctions = auctionsState.read();
if (pendingAuctions != null) {
for (Auction pendingAuction : pendingAuctions) {
- oldNewOutputCounter.addValue(1L);
+ oldNewOutputCounter.inc();
c.output(KV.of(pendingAuction, newPerson));
}
auctionsState.clear();
}
// Also deal with any new auctions.
for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.addValue(1L);
- newNewOutputCounter.addValue(1L);
+ newAuctionCounter.inc();
+ newNewOutputCounter.inc();
c.output(KV.of(newAuction, newPerson));
}
// Remember this person for any future auctions.
@@ -293,17 +288,17 @@ public class Query3 extends NexmarkQuery {
pendingAuctions = new ArrayList<>();
}
for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.addValue(1L);
+ newAuctionCounter.inc();
pendingAuctions.add(newAuction);
}
auctionsState.write(pendingAuctions);
}
- @OnTimer(PERSON_STATE_EXPIRING)
- public void onTimerCallback(
- OnTimerContext context,
- @StateId(PERSON) ValueState<Person> personState) {
- personState.clear();
- }
+ @OnTimer(PERSON_STATE_EXPIRING)
+ public void onTimerCallback(
+ OnTimerContext context,
+ @StateId(PERSON) ValueState<Person> personState) {
+ personState.clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
index be74151..43d6690 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/BoundedEventSource.java
@@ -156,7 +156,7 @@ public class BoundedEventSource extends BoundedSource<Event> {
}
@Override
- public List<BoundedEventSource> splitIntoBundles(
+ public List<BoundedEventSource> split(
long desiredBundleSizeBytes, PipelineOptions options) {
NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
List<BoundedEventSource> results = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
index 286c576..c3c6eb0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java
@@ -289,7 +289,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check
}
@Override
- public List<UnboundedEventSource> generateInitialSplits(
+ public List<UnboundedEventSource> split(
int desiredNumSplits, PipelineOptions options) {
LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators);
List<UnboundedEventSource> results = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/b438fa7d/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
index 3f85bab..c5d7725 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/BoundedEventSourceTest.java
@@ -66,6 +66,6 @@ public class BoundedEventSourceTest {
long n = 200L;
BoundedEventSource source = new BoundedEventSource(makeConfig(n), 1);
SourceTestUtils.assertSourcesEqualReferenceSource(
- source, source.splitIntoBundles(10, options), options);
+ source, source.split(10, options), options);
}
}