You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:28:24 UTC
[36/50] [abbrv] incubator-beam git commit: [BEAM-816] Aggregators are
not properly named when reported to Graphite.
[BEAM-816] Aggregators are not properly named when reported to Graphite.
Added NamedAggregatorTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6db94249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6db94249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6db94249
Branch: refs/heads/python-sdk
Commit: 6db942498a6aa20e9fd253871320d0ee4aa9476d
Parents: dc61a00
Author: Stas Levin <st...@gmail.com>
Authored: Tue Oct 25 18:23:23 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 21:16:01 2016 +0300
----------------------------------------------------------------------
.../metrics/AggregatorMetricSource.java | 9 +-
.../metrics/WithNamedAggregatorsSupport.java | 7 +-
.../spark/translation/SparkRuntimeContext.java | 2 +-
.../runners/spark/ClearAggregatorsRule.java | 33 -------
.../runners/spark/InMemoryMetricsSinkRule.java | 32 -------
.../metrics/sink/ClearAggregatorsRule.java | 33 +++++++
.../metrics/sink/InMemoryMetrics.java | 15 +++-
.../metrics/sink/InMemoryMetricsSinkRule.java | 31 +++++++
.../metrics/sink/NamedAggregatorsTest.java | 92 ++++++++++++++++++++
9 files changed, 179 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
index 0658e04..2a00aec 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
@@ -29,17 +29,18 @@ import org.apache.spark.metrics.source.Source;
*/
public class AggregatorMetricSource implements Source {
- private static final String SOURCE_NAME = "NamedAggregators";
+ private final String sourceName;
private final MetricRegistry metricRegistry = new MetricRegistry();
- public AggregatorMetricSource(final NamedAggregators aggregators) {
- metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators));
+ public AggregatorMetricSource(final String appName, final NamedAggregators aggregators) {
+ sourceName = appName;
+ metricRegistry.register("Beam", AggregatorMetric.of(aggregators));
}
@Override
public String sourceName() {
- return SOURCE_NAME;
+ return sourceName;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
index 88e2211..6932ae6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
@@ -118,8 +118,13 @@ public class WithNamedAggregatorsSupport extends MetricRegistry {
@Override
public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) {
final NamedAggregators agg = ((AggregatorMetric) entry.getValue()).getNamedAggregators();
+ final String parentName = entry.getKey();
final Map<String, Gauge> gaugeMap = Maps.transformEntries(agg.renderAll(), toGauge());
- return Maps.filterValues(gaugeMap, Predicates.notNull());
+ final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
+ for (String shortName : gaugeMap.keySet()) {
+ fullNameGaugeMap.put(parentName + "." + shortName, gaugeMap.get(shortName));
+ }
+ return Maps.filterValues(fullNameGaugeMap, Predicates.notNull());
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 94c1648..181a111 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -89,7 +89,7 @@ public class SparkRuntimeContext implements Serializable {
if (opts.getEnableSparkSinks()) {
final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
final AggregatorMetricSource aggregatorMetricSource =
- new AggregatorMetricSource(initialValue);
+ new AggregatorMetricSource(opts.getAppName(), initialValue);
// in case the context was not cleared
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
deleted file mode 100644
index beaae13..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
-import org.junit.rules.ExternalResource;
-
-/**
- * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
- * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s.
- */
-class ClearAggregatorsRule extends ExternalResource {
- @Override
- protected void before() throws Throwable {
- AccumulatorSingleton.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
deleted file mode 100644
index 506dbbd..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
-import org.junit.rules.ExternalResource;
-
-/**
- * A rule that cleans the {@link InMemoryMetrics} after the tests has finished.
- */
-class InMemoryMetricsSinkRule extends ExternalResource {
- @Override
- protected void before() throws Throwable {
- InMemoryMetrics.clearAll();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
new file mode 100644
index 0000000..79c58a7
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
+ * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s.
+ */
+class ClearAggregatorsRule extends ExternalResource {
+ @Override
+ protected void before() throws Throwable {
+ AccumulatorSingleton.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index 35e6717..389cd03 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -20,12 +20,13 @@ package org.apache.beam.runners.spark.aggregators.metrics.sink;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
import java.util.Properties;
-
import org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
import org.apache.spark.metrics.sink.Sink;
+
/**
* An in-memory {@link Sink} implementation for tests.
*/
@@ -45,9 +46,15 @@ public class InMemoryMetrics implements Sink {
public static <T> T valueOf(final String name) {
final T retVal;
+ // this might fail in case we have multiple aggregators with the same suffix after
+ // the last dot, but it should be good enough for tests.
if (extendedMetricsRegistry != null
- && extendedMetricsRegistry.getGauges().containsKey(name)) {
- retVal = (T) extendedMetricsRegistry.getGauges().get(name).getValue();
+ && Iterables.any(extendedMetricsRegistry.getGauges().keySet(),
+ Predicates.containsPattern(name + "$"))) {
+ String key =
+ Iterables.find(extendedMetricsRegistry.getGauges().keySet(),
+ Predicates.containsPattern(name + "$"));
+ retVal = (T) extendedMetricsRegistry.getGauges().get(key).getValue();
} else {
retVal = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
new file mode 100644
index 0000000..5a3d19d
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetricsSinkRule.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that cleans the {@link InMemoryMetrics} after the tests has finished.
+ */
+class InMemoryMetricsSinkRule extends ExternalResource {
+ @Override
+ protected void before() throws Throwable {
+ InMemoryMetrics.clearAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6db94249/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
new file mode 100644
index 0000000..194d66a
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+
+/**
+ * A test for the NamedAggregators mechanism.
+ */
+public class NamedAggregatorsTest {
+
+ @Rule
+ public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
+
+ @Rule
+ public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
+
+ private Pipeline createSparkPipeline() {
+ final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ return Pipeline.create(options);
+ }
+
+ private void runPipeline() {
+
+ final List<String> words =
+ Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
+
+ final Set<String> expectedCounts =
+ ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+
+ final Pipeline pipeline = createSparkPipeline();
+
+ final PCollection<String> output =
+ pipeline
+ .apply(Create.of(words).withCoder(StringUtf8Coder.of()))
+ .apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+ PAssert.that(output).containsInAnyOrder(expectedCounts);
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testNamedAggregators() throws Exception {
+
+ assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
+
+ runPipeline();
+
+ assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
+
+ }
+}