You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/19 17:39:06 UTC
[1/2] incubator-beam git commit: Add CounterNameAndMetadata to
support structured counter name
Repository: incubator-beam
Updated Branches:
refs/heads/master 70e6a1310 -> 861562239
Add CounterNameAndMetadata to support structured counter name
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bd40bff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bd40bff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bd40bff4
Branch: refs/heads/master
Commit: bd40bff4e4696a76adac08e89275862e119ec141
Parents: 70e6a13
Author: Pei He <pe...@google.com>
Authored: Wed Mar 30 22:38:34 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 08:24:33 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/CounterAggregator.java | 2 +-
.../apache/beam/sdk/util/common/Counter.java | 84 +++++++---
.../beam/sdk/util/common/CounterName.java | 153 +++++++++++++++++++
.../apache/beam/sdk/util/common/CounterSet.java | 11 +-
.../sdk/util/common/worker/StateSampler.java | 2 +-
5 files changed, 220 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
index de0c251..5fd04f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
@@ -87,7 +87,7 @@ public class CounterAggregator<InputT, AccumT, OutputT> implements Aggregator<In
@Override
public String getName() {
- return counter.getName();
+ return counter.getFlatName();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
index 6cdacc5..6024576 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
@@ -107,11 +107,21 @@ public abstract class Counter<T> {
* @return the newly constructed Counter
* @throws IllegalArgumentException if the aggregation kind is not supported
*/
- public static Counter<Integer> ints(String name, AggregationKind kind) {
+ public static Counter<Integer> ints(CounterName name, AggregationKind kind) {
return new IntegerCounter(name, kind);
}
/**
+ * Constructs a new {@code Counter<Integer>} with an unstructured name.
+ *
+ * @deprecated use {@link #ints(CounterName, AggregationKind)}.
+ */
+ @Deprecated
+ public static Counter<Integer> ints(String name, AggregationKind kind) {
+ return new IntegerCounter(CounterName.named(name), kind);
+ }
+
+ /**
* Constructs a new {@link Counter} that aggregates {@link Long} values
* according to the desired aggregation kind. The supported aggregation kinds
* are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
@@ -122,11 +132,21 @@ public abstract class Counter<T> {
* @return the newly constructed Counter
* @throws IllegalArgumentException if the aggregation kind is not supported
*/
- public static Counter<Long> longs(String name, AggregationKind kind) {
+ public static Counter<Long> longs(CounterName name, AggregationKind kind) {
return new LongCounter(name, kind);
}
/**
+ * Constructs a new {@code Counter<Long>} with an unstructured name.
+ *
+ * @deprecated use {@link #longs(CounterName, AggregationKind)}.
+ */
+ @Deprecated
+ public static Counter<Long> longs(String name, AggregationKind kind) {
+ return new LongCounter(CounterName.named(name), kind);
+ }
+
+ /**
* Constructs a new {@link Counter} that aggregates {@link Double} values
* according to the desired aggregation kind. The supported aggregation kinds
* are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
@@ -137,11 +157,21 @@ public abstract class Counter<T> {
* @return the newly constructed Counter
* @throws IllegalArgumentException if the aggregation kind is not supported
*/
- public static Counter<Double> doubles(String name, AggregationKind kind) {
+ public static Counter<Double> doubles(CounterName name, AggregationKind kind) {
return new DoubleCounter(name, kind);
}
/**
+ * Constructs a new {@code Counter<Double>} with an unstructured name.
+ *
+ * @deprecated use {@link #doubles(CounterName, AggregationKind)}.
+ */
+ @Deprecated
+ public static Counter<Double> doubles(String name, AggregationKind kind) {
+ return new DoubleCounter(CounterName.named(name), kind);
+ }
+
+ /**
* Constructs a new {@link Counter} that aggregates {@link Boolean} values
* according to the desired aggregation kind. The only supported aggregation
* kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}.
@@ -151,26 +181,20 @@ public abstract class Counter<T> {
* @return the newly constructed Counter
* @throws IllegalArgumentException if the aggregation kind is not supported
*/
- public static Counter<Boolean> booleans(String name, AggregationKind kind) {
+ public static Counter<Boolean> booleans(CounterName name, AggregationKind kind) {
return new BooleanCounter(name, kind);
}
/**
- * Constructs a new {@link Counter} that aggregates {@link String} values
- * according to the desired aggregation kind. The only supported aggregation
- * kind is {@link AggregationKind#MIN} and {@link AggregationKind#MAX}.
+ * Constructs a new {@code Counter<Boolean>} with an unstructured name.
*
- * @param name the name of the new counter
- * @param kind the new counter's aggregation kind
- * @return the newly constructed Counter
- * @throws IllegalArgumentException if the aggregation kind is not supported
+ * @deprecated use {@link #booleans(CounterName, AggregationKind)}.
*/
- @SuppressWarnings("unused")
- private static Counter<String> strings(String name, AggregationKind kind) {
- return new StringCounter(name, kind);
+ @Deprecated
+ public static Counter<Boolean> booleans(String name, AggregationKind kind) {
+ return new BooleanCounter(CounterName.named(name), kind);
}
-
//////////////////////////////////////////////////////////////////////////////
/**
@@ -209,10 +233,20 @@ public abstract class Counter<T> {
public abstract CounterMean<T> getAndResetMeanDelta();
/**
+ * Returns the counter's flat name.
+ */
+ public String getFlatName() {
+ return name.getFlatName();
+ }
+
+ /**
* Returns the counter's name.
+ *
+ * @deprecated use {@link #getFlatName}.
*/
+ @Deprecated
public String getName() {
- return name;
+ return name.getFlatName();
}
/**
@@ -267,7 +301,7 @@ public abstract class Counter<T> {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append(getName());
+ sb.append(getFlatName());
sb.append(":");
sb.append(getKind());
sb.append("(");
@@ -342,13 +376,13 @@ public abstract class Counter<T> {
//////////////////////////////////////////////////////////////////////////////
- /** The name of this counter. */
- protected final String name;
+ /** The name and metadata of this counter. */
+ protected final CounterName name;
/** The kind of aggregation function to apply to this counter. */
protected final AggregationKind kind;
- protected Counter(String name, AggregationKind kind) {
+ protected Counter(CounterName name, AggregationKind kind) {
this.name = name;
this.kind = kind;
}
@@ -365,7 +399,7 @@ public abstract class Counter<T> {
private final AtomicReference<LongCounterMean> deltaMean;
/** Initializes a new {@link Counter} for {@link Long} values. */
- private LongCounter(String name, AggregationKind kind) {
+ private LongCounter(CounterName name, AggregationKind kind) {
super(name, kind);
switch (kind) {
case MEAN:
@@ -560,7 +594,7 @@ public abstract class Counter<T> {
AtomicReference<DoubleCounterMean> deltaMean;
/** Initializes a new {@link Counter} for {@link Double} values. */
- private DoubleCounter(String name, AggregationKind kind) {
+ private DoubleCounter(CounterName name, AggregationKind kind) {
super(name, kind);
switch (kind) {
case MEAN:
@@ -753,7 +787,7 @@ public abstract class Counter<T> {
private final AtomicBoolean deltaAggregate;
/** Initializes a new {@link Counter} for {@link Boolean} values. */
- private BooleanCounter(String name, AggregationKind kind) {
+ private BooleanCounter(CounterName name, AggregationKind kind) {
super(name, kind);
aggregate = new AtomicBoolean();
deltaAggregate = new AtomicBoolean();
@@ -825,7 +859,7 @@ public abstract class Counter<T> {
*/
private static class StringCounter extends Counter<String> {
/** Initializes a new {@link Counter} for {@link String} values. */
- private StringCounter(String name, AggregationKind kind) {
+ private StringCounter(CounterName name, AggregationKind kind) {
super(name, kind);
// TODO: Support MIN, MAX of Strings.
throw illegalArgumentException();
@@ -908,7 +942,7 @@ public abstract class Counter<T> {
private final AtomicReference<IntegerCounterMean> deltaMean;
/** Initializes a new {@link Counter} for {@link Integer} values. */
- private IntegerCounter(String name, AggregationKind kind) {
+ private IntegerCounter(CounterName name, AggregationKind kind) {
super(name, kind);
switch (kind) {
case MEAN:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
new file mode 100644
index 0000000..b46be98
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.common;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Strings;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The name of a counter identifies the user-specified name, as well as the origin,
+ * the step the counter is associated with, and a prefix to add to the name.
+ *
+ * <p>For backwards compatibility, the {@link CounterName} will be converted to
+ * a flat name (string) during the migration.
+ */
+public class CounterName {
+ /**
+ * Returns a {@link CounterName} with the given name.
+ */
+ public static CounterName named(String name) {
+ return new CounterName(name, "", "", "");
+ }
+
+ /**
+ * Returns a msecs {@link CounterName}.
+ */
+ public static CounterName msecs(String name) {
+ return named(name + "-msecs");
+ }
+
+ /**
+ * Returns a {@link CounterName} identical to this, but with the given origin.
+ */
+ public CounterName withOrigin(String origin) {
+ return new CounterName(this.name, origin, this.stepName, this.prefix);
+ }
+
+ /**
+ * Returns a {@link CounterName} identical to this, but with the given step name.
+ */
+ public CounterName withStepName(String stepName) {
+ return new CounterName(this.name, this.origin, stepName, this.prefix);
+ }
+
+ /**
+ * Returns a {@link CounterName} identical to this, but with the given prefix.
+ */
+ public CounterName withPrefix(String prefix) {
+ return new CounterName(this.name, this.origin, this.stepName, prefix);
+ }
+
+ /**
+ * Name of the counter.
+ *
+ * <p>For example, process-msecs, ElementCount.
+ */
+ private final String name;
+
+ /**
+ * Origin (namespace) of counter name.
+ *
+ * <p>For example, "user" for user-defined counters.
+ * It is empty for counters defined by the SDK or the runner.
+ */
+ private final String origin;
+
+ /**
+ * System defined step name or the named-output of a step.
+ *
+ * <p>For example, {@code s1} or {@code s2.out}.
+ * It may be empty when counters don't associate with step names.
+ */
+ private final String stepName;
+
+ /**
+ * Prefix of group of counters.
+ *
+ * <p>It is empty when counters don't have general prefixes.
+ */
+ private final String prefix;
+
+ /**
+ * Flat name is the equivalent unstructured name.
+ *
+ * <p>It is null before {@link #getFlatName()} is called.
+ */
+ private AtomicReference<String> flatName;
+
+ private CounterName(String name, String origin, String stepName, String prefix) {
+ this.name = checkNotNull(name, "name");
+ this.origin = checkNotNull(origin, "origin");
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.prefix = checkNotNull(prefix, "prefix");
+ this.flatName = new AtomicReference<String>();
+ }
+
+ /**
+ * Returns the flat name of a structured counter.
+ */
+ public String getFlatName() {
+ String ret = flatName.get();
+ if (ret == null) {
+ StringBuilder sb = new StringBuilder();
+ if (!Strings.isNullOrEmpty(prefix)) {
+ // Not all runner versions use "-" to concatenate prefix, it may already have it in it.
+ sb.append(prefix);
+ }
+ if (!Strings.isNullOrEmpty(origin)) {
+ sb.append(origin + "-");
+ }
+ if (!Strings.isNullOrEmpty(stepName)) {
+ sb.append(stepName + "-");
+ }
+ sb.append(name);
+ flatName.compareAndSet(null, sb.toString());
+ ret = flatName.get();
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o instanceof CounterName) {
+ CounterName that = (CounterName) o;
+ return this.getFlatName().equals(that.getFlatName());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getFlatName().hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
index 699d7d3..cb0ffe5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
@@ -74,10 +74,11 @@ public class CounterSet extends AbstractSet<Counter<?>> {
* name but an incompatible kind had already been added
*/
public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) {
- Counter<?> oldCounter = counters.get(counter.getName());
+ String flatName = counter.getFlatName();
+ Counter<?> oldCounter = counters.get(flatName);
if (oldCounter == null) {
// A new counter.
- counters.put(counter.getName(), counter);
+ counters.put(flatName, counter);
return counter;
}
if (counter.isCompatibleWith(oldCounter)) {
@@ -125,16 +126,16 @@ public class CounterSet extends AbstractSet<Counter<?>> {
if (null == e) {
return false;
}
- if (counters.containsKey(e.getName())) {
+ if (counters.containsKey(e.getFlatName())) {
return false;
}
- counters.put(e.getName(), e);
+ counters.put(e.getFlatName(), e);
return true;
}
public synchronized void merge(CounterSet that) {
for (Counter<?> theirCounter : that) {
- Counter<?> myCounter = counters.get(theirCounter.getName());
+ Counter<?> myCounter = counters.get(theirCounter.getFlatName());
if (myCounter != null) {
mergeCounters(myCounter, theirCounter);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
index bc16bdd..ee95260 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
@@ -262,7 +262,7 @@ public class StateSampler implements AutoCloseable {
*/
public synchronized StateSamplerInfo getInfo() {
return currentState == DO_NOT_SAMPLE ? null
- : new StateSamplerInfo(countersByState.get(currentState).getName(),
+ : new StateSamplerInfo(countersByState.get(currentState).getFlatName(),
stateTransitionCount, null);
}
[2/2] incubator-beam git commit: This closes #102
Posted by bc...@apache.org.
This closes #102
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/86156223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/86156223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/86156223
Branch: refs/heads/master
Commit: 861562239aafdaa7c4196e8c36b2dc63f9d2953a
Parents: 70e6a13 bd40bff
Author: bchambers <bc...@google.com>
Authored: Tue Apr 19 08:24:38 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 08:24:38 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/CounterAggregator.java | 2 +-
.../apache/beam/sdk/util/common/Counter.java | 84 +++++++---
.../beam/sdk/util/common/CounterName.java | 153 +++++++++++++++++++
.../apache/beam/sdk/util/common/CounterSet.java | 11 +-
.../sdk/util/common/worker/StateSampler.java | 2 +-
5 files changed, 220 insertions(+), 32 deletions(-)
----------------------------------------------------------------------