You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2022/09/05 07:11:14 UTC

[beam] branch master updated: Generalize interface of InfluxDBPublisher to support more use cases (#22238) (#22260)

This is an automated email from the ASF dual-hosted git repository.

mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c91e7b24a5 Generalize interface of InfluxDBPublisher to support more use cases (#22238) (#22260)
3c91e7b24a5 is described below

commit 3c91e7b24a53a6a5b929ede58231bbc57c9ddced
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Mon Sep 5 09:11:05 2022 +0200

    Generalize interface of InfluxDBPublisher to support more use cases (#22238) (#22260)
---
 .../java/org/apache/beam/sdk/nexmark/Main.java     |  72 +++------
 .../apache/beam/sdk/testutils/NamedTestResult.java |  19 ++-
 .../testutils/publishing/InfluxDBPublisher.java    | 177 ++++++++++++---------
 .../publishing/InfluxDBPublisherTest.java          |  76 +++++++++
 4 files changed, 221 insertions(+), 123 deletions(-)

diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 0ac009712af..95554c0c809 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.nexmark;
 
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
 import static org.apache.beam.sdk.nexmark.NexmarkUtils.processingMode;
 
 import java.io.IOException;
@@ -27,6 +26,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher.DataPoint;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -150,17 +151,9 @@ public class Main {
         saveSummary(null, configurations, actual, baseline, start);
       }
 
-      final ImmutableMap<String, String> schema =
-          ImmutableMap.<String, String>builder()
-              .put("timestamp", "timestamp")
-              .put("runtimeSec", "float")
-              .put("eventsPerSec", "float")
-              .put("numResults", "integer")
-              .build();
-
       if (options.getExportSummaryToInfluxDB()) {
         final long timestamp = start.getMillis() / 1000; // seconds
-        savePerfsToInfluxDB(options, schema, actual, timestamp);
+        savePerfsToInfluxDB(options, actual, timestamp);
       }
 
     } finally {
@@ -179,24 +172,18 @@ public class Main {
 
   private static void savePerfsToInfluxDB(
       final NexmarkOptions options,
-      final Map<String, String> schema,
       final Map<NexmarkConfiguration, NexmarkPerf> results,
       final long timestamp) {
     final InfluxDBSettings settings = getInfluxSettings(options);
-    final Map<String, String> tags = options.getInfluxTags();
-    final String runner = options.getRunner().getSimpleName();
-    final List<Map<String, Object>> schemaResults =
+    final Map<String, String> tags =
+        options.getInfluxTags() != null ? new HashMap<>(options.getInfluxTags()) : new HashMap<>();
+    tags.put("runner", options.getRunner().getSimpleName());
+
+    final List<DataPoint> dataPoints =
         results.entrySet().stream()
-            .map(
-                entry ->
-                    getResultsFromSchema(
-                        entry.getValue(),
-                        schema,
-                        timestamp,
-                        runner,
-                        produceMeasurement(options, entry)))
+            .map(entry -> createInfluxDBDataPoint(options, entry, tags, timestamp))
             .collect(toList());
-    InfluxDBPublisher.publishNexmarkResults(schemaResults, settings, tags);
+    InfluxDBPublisher.publish(settings, dataPoints);
   }
 
   private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options) {
@@ -208,38 +195,25 @@ public class Main {
         .get();
   }
 
-  private static String produceMeasurement(
-      final NexmarkOptions options, Map.Entry<NexmarkConfiguration, NexmarkPerf> entry) {
+  private static String generateMeasurementName(
+      final NexmarkOptions options, NexmarkConfiguration config) {
     final String queryName =
-        NexmarkUtils.fullQueryName(
-            options.getQueryLanguage(), entry.getKey().query.getNumberOrName());
+        NexmarkUtils.fullQueryName(options.getQueryLanguage(), config.query.getNumberOrName());
     return String.format(
         "%s_%s_%s",
         options.getBaseInfluxMeasurement(), queryName, processingMode(options.isStreaming()));
   }
 
-  private static Map<String, Object> getResultsFromSchema(
-      final NexmarkPerf results,
-      final Map<String, String> schema,
-      final long timestamp,
-      final String runner,
-      final String measurement) {
-    final Map<String, Object> schemaResults =
-        results.toMap().entrySet().stream()
-            .filter(element -> schema.containsKey(element.getKey()))
-            .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
-    final int runtimeMs =
-        (int) ((double) schemaResults.get("runtimeSec") * 1000); // change sec to ms
-    schemaResults.put("timestamp", timestamp);
-    schemaResults.put("runner", runner);
-    schemaResults.put("measurement", measurement);
-
-    // By default, InfluxDB treats all number values as floats. We need to add 'i' suffix to
-    // interpret the value as an integer.
-    schemaResults.put("runtimeMs", runtimeMs + "i");
-    schemaResults.put("numResults", schemaResults.get("numResults") + "i");
-
-    return schemaResults;
+  private static InfluxDBPublisher.DataPoint createInfluxDBDataPoint(
+      final NexmarkOptions options,
+      final Map.Entry<NexmarkConfiguration, NexmarkPerf> entry,
+      final Map<String, String> tags,
+      final long timestamp) {
+    String measurement = generateMeasurementName(options, entry.getKey());
+    int runtimeMs = (int) (entry.getValue().runtimeSec * 1000); // change sec to ms
+    Map<String, Number> fields =
+        ImmutableMap.of("runtimeMs", runtimeMs, "numResults", entry.getValue().numResults);
+    return InfluxDBPublisher.dataPoint(measurement, tags, fields, timestamp);
   }
 
   /** Append the pair of {@code configuration} and {@code perf} to perf file. */
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java
index 7855a292501..72200ab54f9 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.testutils;
 
 import com.google.cloud.bigquery.LegacySQLTypeName;
 import java.util.Map;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,10 +77,9 @@ public class NamedTestResult implements TestResult {
   @Override
   public Map<String, Object> toMap() {
     return ImmutableMap.<String, Object>builder()
-        .put("test_id", testId)
+        .putAll(tags())
+        .putAll(fields())
         .put("timestamp", timestamp)
-        .put("metric", metric)
-        .put("value", value)
         .build();
   }
 
@@ -94,4 +94,17 @@ public class NamedTestResult implements TestResult {
   public double getValue() {
     return value;
   }
+
+  public Map<String, String> tags() {
+    return ImmutableMap.of("test_id", testId, "metric", metric);
+  }
+
+  public Map<String, Number> fields() {
+    return ImmutableMap.of("value", value);
+  }
+
+  /** Convert this result to InfluxDB data point. */
+  public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) {
+    return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null);
+  }
 }
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
index 02c721e3eff..d00580c88e5 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
@@ -20,9 +20,13 @@ package org.apache.beam.sdk.testutils.publishing;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.isNull;
 import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNoneBlank;
 
+import com.google.auto.value.AutoValue;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
@@ -31,7 +35,14 @@ import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.commons.compress.utils.Charsets;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
@@ -46,35 +57,87 @@ import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public final class InfluxDBPublisher {
   private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
 
   private InfluxDBPublisher() {}
 
+  /** InfluxDB data point. */
+  @AutoValue
+  public abstract static class DataPoint {
+    DataPoint() {}
+
+    public abstract @Pure String measurement();
+
+    public abstract @Pure Map<String, String> tags();
+
+    public abstract @Pure Map<String, Number> fields();
+
+    @Nullable
+    public abstract @Pure Long timestamp();
+
+    public abstract @Pure TimeUnit timestampUnit();
+
+    @Override
+    public final String toString() {
+      return append(new StringBuilder()).toString();
+    }
+
+    private @Nullable Long timestampSecs() {
+      return timestamp() != null ? timestampUnit().toSeconds(timestamp()) : null;
+    }
+
+    private StringBuilder append(StringBuilder builder) {
+      return addMeasurement(builder, measurement(), tags(), fields(), timestampSecs());
+    }
+  }
+
+  /** Creates an InfluxDB data point using optional custom Unix timestamp in seconds. */
+  public static DataPoint dataPoint(
+      String measurement,
+      Map<String, String> tags,
+      Map<String, Number> fields,
+      @Nullable Long timestampSecs) {
+    return new AutoValue_InfluxDBPublisher_DataPoint(
+        measurement, tags, fields, timestampSecs, TimeUnit.SECONDS);
+  }
+
+  /** @deprecated Use {@link #publish} instead. */
+  @Deprecated
   public static void publishNexmarkResults(
       final Collection<Map<String, Object>> results,
       final InfluxDBSettings settings,
       final Map<String, String> tags) {
-    publishWithCheck(settings, () -> publishNexmark(results, settings, tags));
+    publishWithCheck(settings, nexmarkDataPoints(results, tags));
   }
 
   public static void publishWithSettings(
       final Collection<NamedTestResult> results, final InfluxDBSettings settings) {
-    publishWithCheck(settings, () -> publishCommon(results, settings));
+    @SuppressWarnings("nullness")
+    Collection<DataPoint> dataPoints =
+        Collections2.transform(results, res -> res.toInfluxDBDataPoint(settings.measurement));
+    publish(settings, dataPoints);
+  }
+
+  public static void publish(
+      final InfluxDBSettings settings, final Collection<DataPoint> dataPoints) {
+    final StringBuilder builder = new StringBuilder();
+    dataPoints.forEach(m -> m.append(builder).append('\n'));
+    publishWithCheck(settings, builder.toString());
   }
 
-  private static void publishWithCheck(
-      final InfluxDBSettings settings, final PublishFunction publishFunction) {
+  private static void publishWithCheck(final InfluxDBSettings settings, final String data) {
     requireNonNull(settings, "InfluxDB settings must not be null");
     if (isNoneBlank(settings.measurement, settings.database)) {
       try {
-        publishFunction.publish();
+        final HttpClientBuilder builder = provideHttpBuilder(settings);
+        final HttpPost postRequest = providePOSTRequest(settings);
+        postRequest.setEntity(new GzipCompressingEntity(new ByteArrayEntity(data.getBytes(UTF_8))));
+        executeWithVerification(postRequest, builder);
       } catch (Exception exception) {
         LOG.warn("Unable to publish metrics due to error: {}", exception.getMessage());
       }
@@ -83,73 +146,50 @@ public final class InfluxDBPublisher {
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  /** @deprecated To be removed, kept for legacy interface {@link #publishNexmarkResults} */
+  @VisibleForTesting
+  @Deprecated
+  static String nexmarkDataPoints(
+      final Collection<Map<String, Object>> results, final Map<String, String> tags) {
+    final StringBuilder builder = new StringBuilder();
+    final Set<String> fields = ImmutableSet.of("runtimeMs", "numResults");
     results.forEach(
         map -> {
-          metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, "runner"));
-          if (tags != null && !tags.isEmpty()) {
-            tags.entrySet().stream()
-                .forEach(
-                    entry -> {
-                      metricBuilder
-                          .append(",")
-                          .append(entry.getKey())
-                          .append("=")
-                          .append(entry.getValue());
-                    });
-          }
-          metricBuilder
-              .append(" ")
-              .append(getKV(map, "runtimeMs"))
-              .append(",")
-              .append(getKV(map, "numResults"))
-              .append(" ")
-              .append(map.get("timestamp"))
+          String measurement = checkArgumentNotNull(map.get("measurement")).toString();
+          addMeasurement(builder, measurement, tags, filterKeys(map, fields), map.get("timestamp"))
               .append('\n');
         });
+    return builder.toString();
+  }
 
-    postRequest.setEntity(
-        new GzipCompressingEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))));
-
-    executeWithVerification(postRequest, builder);
+  @SuppressWarnings("nullness")
+  private static <K, V> Map<K, V> filterKeys(final Map<K, V> map, final Set<K> keys) {
+    return Maps.filterKeys(map, keys::contains);
   }
 
-  private static String getKV(final Map<String, Object> map, final String key) {
-    return key + "=" + map.get(key);
+  // fix types once nexmarkMeasurements is removed
+  private static StringBuilder addMeasurement(
+      StringBuilder builder,
+      String measurement,
+      Map<String, ?> tags,
+      Map<String, ?> fields,
+      @Nullable Object timestampSecs) {
+    checkState(!fields.isEmpty(), "fields cannot be empty");
+    builder.append(measurement);
+    tags.forEach((k, v) -> builder.append(',').append(k).append('=').append(v));
+    builder.append(' ');
+    fields.forEach((k, v) -> builder.append(k).append('=').append(fieldValue(v)).append(','));
+    builder.setLength(builder.length() - 1); // skip last comma
+    if (timestampSecs != null) {
+      builder.append(' ').append(timestampSecs);
+    }
+    return builder;
   }
 
-  private static void publishCommon(
-      final Collection<NamedTestResult> results, final InfluxDBSettings settings) throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-    results.stream()
-        .map(NamedTestResult::toMap)
-        .forEach(
-            map ->
-                metricBuilder
-                    .append(settings.measurement)
-                    .append(",")
-                    .append(getKV(map, "test_id"))
-                    .append(",")
-                    .append(getKV(map, "metric"))
-                    .append(" ")
-                    .append(getKV(map, "value"))
-                    .append('\n'));
-
-    postRequest.setEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8)));
-
-    executeWithVerification(postRequest, builder);
+  private static String fieldValue(@Nullable Object value) {
+    checkStateNotNull(value, "field value cannot be null");
+    // append 'i' suffix for 64-bit integer, default is float
+    return (value instanceof Integer || value instanceof Long) ? value + "i" : value.toString();
   }
 
   private static HttpClientBuilder provideHttpBuilder(final InfluxDBSettings settings) {
@@ -197,9 +237,4 @@ public final class InfluxDBPublisher {
         new Gson().fromJson(EntityUtils.toString(entity, encoding), JsonObject.class).get("error");
     return isNull(errorElement) ? "[Unable to get error message]" : errorElement.toString();
   }
-
-  @FunctionalInterface
-  private interface PublishFunction {
-    void publish() throws Exception;
-  }
 }
diff --git a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java
new file mode 100644
index 00000000000..b796af6a106
--- /dev/null
+++ b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+public final class InfluxDBPublisherTest {
+
+  @Test
+  public void testNexmarkDataPoints() {
+    Map<String, Object> measurement =
+        ImmutableMap.<String, Object>builder()
+            .put("measurement", "name")
+            .put("timestamp", 9999L)
+            .put("runtimeMs", "1000i")
+            .put("numResults", "10i")
+            .build();
+    List<Map<String, Object>> measurements =
+        ImmutableList.of(measurement, measurement, measurement);
+
+    Map<String, String> tags =
+        ImmutableMap.of(
+            "runner", "test",
+            "tag", "value");
+
+    String actual = InfluxDBPublisher.nexmarkDataPoints(measurements, tags);
+    String expected = "name,runner=test,tag=value runtimeMs=1000i,numResults=10i 9999\n";
+
+    assertEquals(expected + expected + expected, actual);
+  }
+
+  @Test
+  public void testNamedTestResultToDataPoint() {
+    NamedTestResult result = NamedTestResult.create("id1", "9999", "metric1", 100);
+
+    String actual = result.toInfluxDBDataPoint("name").toString();
+    String expected = "name,test_id=id1,metric=metric1 value=100.0";
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDataPointToString() {
+    Map<String, String> tags = ImmutableMap.of("tag1", "t1", "tag2", "t2");
+    Map<String, Number> fields = ImmutableMap.of("integer", 100, "float", 100.0);
+
+    assertEquals(
+        "m1,tag1=t1,tag2=t2 integer=100i,float=100.0 999",
+        InfluxDBPublisher.dataPoint("m1", tags, fields, 999L).toString());
+    assertEquals(
+        "m1,tag1=t1,tag2=t2 integer=100i,float=100.0",
+        InfluxDBPublisher.dataPoint("m1", tags, fields, null).toString());
+  }
+}