You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2022/09/05 07:11:14 UTC
[beam] branch master updated: Generalize interface of InfluxDBPublisher to support more use cases (#22238) (#22260)
This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3c91e7b24a5 Generalize interface of InfluxDBPublisher to support more use cases (#22238) (#22260)
3c91e7b24a5 is described below
commit 3c91e7b24a53a6a5b929ede58231bbc57c9ddced
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Mon Sep 5 09:11:05 2022 +0200
Generalize interface of InfluxDBPublisher to support more use cases (#22238) (#22260)
---
.../java/org/apache/beam/sdk/nexmark/Main.java | 72 +++------
.../apache/beam/sdk/testutils/NamedTestResult.java | 19 ++-
.../testutils/publishing/InfluxDBPublisher.java | 177 ++++++++++++---------
.../publishing/InfluxDBPublisherTest.java | 76 +++++++++
4 files changed, 221 insertions(+), 123 deletions(-)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 0ac009712af..95554c0c809 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.nexmark;
import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.nexmark.NexmarkUtils.processingMode;
import java.io.IOException;
@@ -27,6 +26,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher.DataPoint;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -150,17 +151,9 @@ public class Main {
saveSummary(null, configurations, actual, baseline, start);
}
- final ImmutableMap<String, String> schema =
- ImmutableMap.<String, String>builder()
- .put("timestamp", "timestamp")
- .put("runtimeSec", "float")
- .put("eventsPerSec", "float")
- .put("numResults", "integer")
- .build();
-
if (options.getExportSummaryToInfluxDB()) {
final long timestamp = start.getMillis() / 1000; // seconds
- savePerfsToInfluxDB(options, schema, actual, timestamp);
+ savePerfsToInfluxDB(options, actual, timestamp);
}
} finally {
@@ -179,24 +172,18 @@ public class Main {
private static void savePerfsToInfluxDB(
final NexmarkOptions options,
- final Map<String, String> schema,
final Map<NexmarkConfiguration, NexmarkPerf> results,
final long timestamp) {
final InfluxDBSettings settings = getInfluxSettings(options);
- final Map<String, String> tags = options.getInfluxTags();
- final String runner = options.getRunner().getSimpleName();
- final List<Map<String, Object>> schemaResults =
+ final Map<String, String> tags =
+ options.getInfluxTags() != null ? new HashMap<>(options.getInfluxTags()) : new HashMap<>();
+ tags.put("runner", options.getRunner().getSimpleName());
+
+ final List<DataPoint> dataPoints =
results.entrySet().stream()
- .map(
- entry ->
- getResultsFromSchema(
- entry.getValue(),
- schema,
- timestamp,
- runner,
- produceMeasurement(options, entry)))
+ .map(entry -> createInfluxDBDataPoint(options, entry, tags, timestamp))
.collect(toList());
- InfluxDBPublisher.publishNexmarkResults(schemaResults, settings, tags);
+ InfluxDBPublisher.publish(settings, dataPoints);
}
private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options) {
@@ -208,38 +195,25 @@ public class Main {
.get();
}
- private static String produceMeasurement(
- final NexmarkOptions options, Map.Entry<NexmarkConfiguration, NexmarkPerf> entry) {
+ private static String generateMeasurementName(
+ final NexmarkOptions options, NexmarkConfiguration config) {
final String queryName =
- NexmarkUtils.fullQueryName(
- options.getQueryLanguage(), entry.getKey().query.getNumberOrName());
+ NexmarkUtils.fullQueryName(options.getQueryLanguage(), config.query.getNumberOrName());
return String.format(
"%s_%s_%s",
options.getBaseInfluxMeasurement(), queryName, processingMode(options.isStreaming()));
}
- private static Map<String, Object> getResultsFromSchema(
- final NexmarkPerf results,
- final Map<String, String> schema,
- final long timestamp,
- final String runner,
- final String measurement) {
- final Map<String, Object> schemaResults =
- results.toMap().entrySet().stream()
- .filter(element -> schema.containsKey(element.getKey()))
- .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
- final int runtimeMs =
- (int) ((double) schemaResults.get("runtimeSec") * 1000); // change sec to ms
- schemaResults.put("timestamp", timestamp);
- schemaResults.put("runner", runner);
- schemaResults.put("measurement", measurement);
-
- // By default, InfluxDB treats all number values as floats. We need to add 'i' suffix to
- // interpret the value as an integer.
- schemaResults.put("runtimeMs", runtimeMs + "i");
- schemaResults.put("numResults", schemaResults.get("numResults") + "i");
-
- return schemaResults;
+ private static InfluxDBPublisher.DataPoint createInfluxDBDataPoint(
+ final NexmarkOptions options,
+ final Map.Entry<NexmarkConfiguration, NexmarkPerf> entry,
+ final Map<String, String> tags,
+ final long timestamp) {
+ String measurement = generateMeasurementName(options, entry.getKey());
+ int runtimeMs = (int) (entry.getValue().runtimeSec * 1000); // change sec to ms
+ Map<String, Number> fields =
+ ImmutableMap.of("runtimeMs", runtimeMs, "numResults", entry.getValue().numResults);
+ return InfluxDBPublisher.dataPoint(measurement, tags, fields, timestamp);
}
/** Append the pair of {@code configuration} and {@code perf} to perf file. */
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java
index 7855a292501..72200ab54f9 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.testutils;
import com.google.cloud.bigquery.LegacySQLTypeName;
import java.util.Map;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,10 +77,9 @@ public class NamedTestResult implements TestResult {
@Override
public Map<String, Object> toMap() {
return ImmutableMap.<String, Object>builder()
- .put("test_id", testId)
+ .putAll(tags())
+ .putAll(fields())
.put("timestamp", timestamp)
- .put("metric", metric)
- .put("value", value)
.build();
}
@@ -94,4 +94,17 @@ public class NamedTestResult implements TestResult {
public double getValue() {
return value;
}
+
+ public Map<String, String> tags() {
+ return ImmutableMap.of("test_id", testId, "metric", metric);
+ }
+
+ public Map<String, Number> fields() {
+ return ImmutableMap.of("value", value);
+ }
+
+ /** Convert this result to InfluxDB data point. */
+ public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) {
+ return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null);
+ }
}
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
index 02c721e3eff..d00580c88e5 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
@@ -20,9 +20,13 @@ package org.apache.beam.sdk.testutils.publishing;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNoneBlank;
+import com.google.auto.value.AutoValue;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -31,7 +35,14 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.commons.compress.utils.Charsets;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
@@ -46,35 +57,87 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
public final class InfluxDBPublisher {
private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
private InfluxDBPublisher() {}
+ /** InfluxDB data point. */
+ @AutoValue
+ public abstract static class DataPoint {
+ DataPoint() {}
+
+ public abstract @Pure String measurement();
+
+ public abstract @Pure Map<String, String> tags();
+
+ public abstract @Pure Map<String, Number> fields();
+
+ @Nullable
+ public abstract @Pure Long timestamp();
+
+ public abstract @Pure TimeUnit timestampUnit();
+
+ @Override
+ public final String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ private @Nullable Long timestampSecs() {
+ return timestamp() != null ? timestampUnit().toSeconds(timestamp()) : null;
+ }
+
+ private StringBuilder append(StringBuilder builder) {
+ return addMeasurement(builder, measurement(), tags(), fields(), timestampSecs());
+ }
+ }
+
+ /** Creates an InfluxDB data point using optional custom Unix timestamp in seconds. */
+ public static DataPoint dataPoint(
+ String measurement,
+ Map<String, String> tags,
+ Map<String, Number> fields,
+ @Nullable Long timestampSecs) {
+ return new AutoValue_InfluxDBPublisher_DataPoint(
+ measurement, tags, fields, timestampSecs, TimeUnit.SECONDS);
+ }
+
+ /** @deprecated Use {@link #publish} instead. */
+ @Deprecated
public static void publishNexmarkResults(
final Collection<Map<String, Object>> results,
final InfluxDBSettings settings,
final Map<String, String> tags) {
- publishWithCheck(settings, () -> publishNexmark(results, settings, tags));
+ publishWithCheck(settings, nexmarkDataPoints(results, tags));
}
public static void publishWithSettings(
final Collection<NamedTestResult> results, final InfluxDBSettings settings) {
- publishWithCheck(settings, () -> publishCommon(results, settings));
+ @SuppressWarnings("nullness")
+ Collection<DataPoint> dataPoints =
+ Collections2.transform(results, res -> res.toInfluxDBDataPoint(settings.measurement));
+ publish(settings, dataPoints);
+ }
+
+ public static void publish(
+ final InfluxDBSettings settings, final Collection<DataPoint> dataPoints) {
+ final StringBuilder builder = new StringBuilder();
+ dataPoints.forEach(m -> m.append(builder).append('\n'));
+ publishWithCheck(settings, builder.toString());
}
- private static void publishWithCheck(
- final InfluxDBSettings settings, final PublishFunction publishFunction) {
+ private static void publishWithCheck(final InfluxDBSettings settings, final String data) {
requireNonNull(settings, "InfluxDB settings must not be null");
if (isNoneBlank(settings.measurement, settings.database)) {
try {
- publishFunction.publish();
+ final HttpClientBuilder builder = provideHttpBuilder(settings);
+ final HttpPost postRequest = providePOSTRequest(settings);
+ postRequest.setEntity(new GzipCompressingEntity(new ByteArrayEntity(data.getBytes(UTF_8))));
+ executeWithVerification(postRequest, builder);
} catch (Exception exception) {
LOG.warn("Unable to publish metrics due to error: {}", exception.getMessage());
}
@@ -83,73 +146,50 @@ public final class InfluxDBPublisher {
}
}
- private static void publishNexmark(
- final Collection<Map<String, Object>> results,
- final InfluxDBSettings settings,
- final Map<String, String> tags)
- throws Exception {
-
- final HttpClientBuilder builder = provideHttpBuilder(settings);
- final HttpPost postRequest = providePOSTRequest(settings);
- final StringBuilder metricBuilder = new StringBuilder();
-
+ /** @deprecated To be removed, kept for legacy interface {@link #publishNexmarkResults} */
+ @VisibleForTesting
+ @Deprecated
+ static String nexmarkDataPoints(
+ final Collection<Map<String, Object>> results, final Map<String, String> tags) {
+ final StringBuilder builder = new StringBuilder();
+ final Set<String> fields = ImmutableSet.of("runtimeMs", "numResults");
results.forEach(
map -> {
- metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, "runner"));
- if (tags != null && !tags.isEmpty()) {
- tags.entrySet().stream()
- .forEach(
- entry -> {
- metricBuilder
- .append(",")
- .append(entry.getKey())
- .append("=")
- .append(entry.getValue());
- });
- }
- metricBuilder
- .append(" ")
- .append(getKV(map, "runtimeMs"))
- .append(",")
- .append(getKV(map, "numResults"))
- .append(" ")
- .append(map.get("timestamp"))
+ String measurement = checkArgumentNotNull(map.get("measurement")).toString();
+ addMeasurement(builder, measurement, tags, filterKeys(map, fields), map.get("timestamp"))
.append('\n');
});
+ return builder.toString();
+ }
- postRequest.setEntity(
- new GzipCompressingEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))));
-
- executeWithVerification(postRequest, builder);
+ @SuppressWarnings("nullness")
+ private static <K, V> Map<K, V> filterKeys(final Map<K, V> map, final Set<K> keys) {
+ return Maps.filterKeys(map, keys::contains);
}
- private static String getKV(final Map<String, Object> map, final String key) {
- return key + "=" + map.get(key);
+ // fix types once nexmarkMeasurements is removed
+ private static StringBuilder addMeasurement(
+ StringBuilder builder,
+ String measurement,
+ Map<String, ?> tags,
+ Map<String, ?> fields,
+ @Nullable Object timestampSecs) {
+ checkState(!fields.isEmpty(), "fields cannot be empty");
+ builder.append(measurement);
+ tags.forEach((k, v) -> builder.append(',').append(k).append('=').append(v));
+ builder.append(' ');
+ fields.forEach((k, v) -> builder.append(k).append('=').append(fieldValue(v)).append(','));
+ builder.setLength(builder.length() - 1); // skip last comma
+ if (timestampSecs != null) {
+ builder.append(' ').append(timestampSecs);
+ }
+ return builder;
}
- private static void publishCommon(
- final Collection<NamedTestResult> results, final InfluxDBSettings settings) throws Exception {
-
- final HttpClientBuilder builder = provideHttpBuilder(settings);
- final HttpPost postRequest = providePOSTRequest(settings);
- final StringBuilder metricBuilder = new StringBuilder();
- results.stream()
- .map(NamedTestResult::toMap)
- .forEach(
- map ->
- metricBuilder
- .append(settings.measurement)
- .append(",")
- .append(getKV(map, "test_id"))
- .append(",")
- .append(getKV(map, "metric"))
- .append(" ")
- .append(getKV(map, "value"))
- .append('\n'));
-
- postRequest.setEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8)));
-
- executeWithVerification(postRequest, builder);
+ private static String fieldValue(@Nullable Object value) {
+ checkStateNotNull(value, "field value cannot be null");
+ // append 'i' suffix for 64-bit integer, default is float
+ return (value instanceof Integer || value instanceof Long) ? value + "i" : value.toString();
}
private static HttpClientBuilder provideHttpBuilder(final InfluxDBSettings settings) {
@@ -197,9 +237,4 @@ public final class InfluxDBPublisher {
new Gson().fromJson(EntityUtils.toString(entity, encoding), JsonObject.class).get("error");
return isNull(errorElement) ? "[Unable to get error message]" : errorElement.toString();
}
-
- @FunctionalInterface
- private interface PublishFunction {
- void publish() throws Exception;
- }
}
diff --git a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java
new file mode 100644
index 00000000000..b796af6a106
--- /dev/null
+++ b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+public final class InfluxDBPublisherTest {
+
+ @Test
+ public void testNexmarkDataPoints() {
+ Map<String, Object> measurement =
+ ImmutableMap.<String, Object>builder()
+ .put("measurement", "name")
+ .put("timestamp", 9999L)
+ .put("runtimeMs", "1000i")
+ .put("numResults", "10i")
+ .build();
+ List<Map<String, Object>> measurements =
+ ImmutableList.of(measurement, measurement, measurement);
+
+ Map<String, String> tags =
+ ImmutableMap.of(
+ "runner", "test",
+ "tag", "value");
+
+ String actual = InfluxDBPublisher.nexmarkDataPoints(measurements, tags);
+ String expected = "name,runner=test,tag=value runtimeMs=1000i,numResults=10i 9999\n";
+
+ assertEquals(expected + expected + expected, actual);
+ }
+
+ @Test
+ public void testNamedTestResultToDataPoint() {
+ NamedTestResult result = NamedTestResult.create("id1", "9999", "metric1", 100);
+
+ String actual = result.toInfluxDBDataPoint("name").toString();
+ String expected = "name,test_id=id1,metric=metric1 value=100.0";
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testDataPointToString() {
+ Map<String, String> tags = ImmutableMap.of("tag1", "t1", "tag2", "t2");
+ Map<String, Number> fields = ImmutableMap.of("integer", 100, "float", 100.0);
+
+ assertEquals(
+ "m1,tag1=t1,tag2=t2 integer=100i,float=100.0 999",
+ InfluxDBPublisher.dataPoint("m1", tags, fields, 999L).toString());
+ assertEquals(
+ "m1,tag1=t1,tag2=t2 integer=100i,float=100.0",
+ InfluxDBPublisher.dataPoint("m1", tags, fields, null).toString());
+ }
+}