You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/13 14:47:46 UTC

[GitHub] [beam] mosche opened a new pull request, #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

mosche opened a new pull request, #22260:
URL: https://github.com/apache/beam/pull/22260

   Generalize interface of InfluxDBPublisher in `test-utils` to support more use cases (in preparation for #22238).
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1183389155

   R: @lgajowy 
   R: @kkucharc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1185220719

   @kileys 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1190101359

   Also, wondering a bit ... shouldn't test-utils just use the InfluxDB java client for simplicity?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1210767764

   Run Spark StructuredStreaming ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1212747801

   Thanks a lot @aromanenko-dev and @lukecwik, I'm on PTO for the remainder of this month. I'll address as soon as I'm back. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r957109098


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -83,73 +138,48 @@ private static void publishWithCheck(
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  @VisibleForTesting
+  static String nexmarkDataPoints(
+      final Collection<Map<String, Object>> results, final Map<String, String> tags) {
+    final StringBuilder builder = new StringBuilder();
+    final Set<String> fields = ImmutableSet.of("runtimeMs", "numResults");
     results.forEach(
         map -> {
-          metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, "runner"));
-          if (tags != null && !tags.isEmpty()) {
-            tags.entrySet().stream()
-                .forEach(
-                    entry -> {
-                      metricBuilder
-                          .append(",")
-                          .append(entry.getKey())
-                          .append("=")
-                          .append(entry.getValue());
-                    });
-          }
-          metricBuilder
-              .append(" ")
-              .append(getKV(map, "runtimeMs"))
-              .append(",")
-              .append(getKV(map, "numResults"))
-              .append(" ")
-              .append(map.get("timestamp"))
+          String measurement = checkArgumentNotNull(map.get("measurement")).toString();
+          addMeasurement(builder, measurement, tags, filterKeys(map, fields), map.get("timestamp"))
               .append('\n');
         });
+    return builder.toString();
+  }
 
-    postRequest.setEntity(
-        new GzipCompressingEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))));
-
-    executeWithVerification(postRequest, builder);
+  @SuppressWarnings("nullness")
+  private static <K, V> Map<K, V> filterKeys(final Map<K, V> map, final Set<K> keys) {
+    return Maps.filterKeys(map, keys::contains);
   }
 
-  private static String getKV(final Map<String, Object> map, final String key) {
-    return key + "=" + map.get(key);
+  // fix types once nexmarkMeasurements is removed
+  private static StringBuilder addMeasurement(
+      StringBuilder builder,
+      String measurement,
+      Map<String, ?> tags,
+      Map<String, ?> fields,
+      @Nullable Object timestamp) {
+    checkState(!fields.isEmpty(), "fields cannot be empty");
+    builder.append(measurement);
+    tags.forEach((k, v) -> builder.append(',').append(k).append('=').append(v));
+    builder.append(' ');
+    fields.forEach((k, v) -> builder.append(k).append('=').append(fieldValue(v)).append(','));
+    builder.setLength(builder.length() - 1); // skip last comma
+    if (timestamp != null) {
+      builder.append(' ').append(timestamp);

Review Comment:
   Thanks for having a close look, @lukecwik!
   Turns out current precision is in seconds. The only place were the actual timestamp of the test is used is Nexmark:
   ```
   if (options.getExportSummaryToInfluxDB()) {
           final long timestamp = start.getMillis() / 1000; // seconds
           savePerfsToInfluxDB(options, actual, timestamp);
         }
   ```
    The publisher configures `precision` as follows to override the default precision of InfluxDB. Again, 
   ``` 
   return new HttpPost(settings.host + "/write?db=" + settings.database + "&" + retentionPolicy + "&precision=s");
   ```



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -46,35 +56,80 @@
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public final class InfluxDBPublisher {
   private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
 
   private InfluxDBPublisher() {}
 
+  /** InfluxDB data point. */
+  @AutoValue
+  public abstract static class DataPoint {
+    DataPoint() {}
+
+    public abstract @Pure String measurement();
+
+    public abstract @Pure Map<String, String> tags();
+
+    public abstract @Pure Map<String, Number> fields();
+
+    @Nullable
+    public abstract @Pure Long timestamp();

Review Comment:
   I renamed it to timestampSecs and added a comment to explain. Also, see below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1189363806

   Any of the people that you tagged seems to make sense but if they don't respond I can take a look once I'm back from vacation in two weeks.
   
   Also, I understand that we could customize this but there seem to be tools that already do this for JMH like [this gradle plugin](https://github.com/mulesoft-labs/jmh-influx-report). Should we adopt one of these existing solutions or have you excluded them for some reason?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1237115177

   @mosche Many thanks for make it merged!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1189050122

   @lukecwik Any suggestions who could review this? This adds a more general interface to the InfluxDBPublisher to start publishing JMH benchmark results in a follow up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r943748046


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -83,73 +138,48 @@ private static void publishWithCheck(
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  @VisibleForTesting
+  static String nexmarkDataPoints(
+      final Collection<Map<String, Object>> results, final Map<String, String> tags) {
+    final StringBuilder builder = new StringBuilder();
+    final Set<String> fields = ImmutableSet.of("runtimeMs", "numResults");
     results.forEach(
         map -> {
-          metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, "runner"));
-          if (tags != null && !tags.isEmpty()) {
-            tags.entrySet().stream()
-                .forEach(
-                    entry -> {
-                      metricBuilder
-                          .append(",")
-                          .append(entry.getKey())
-                          .append("=")
-                          .append(entry.getValue());
-                    });
-          }
-          metricBuilder
-              .append(" ")
-              .append(getKV(map, "runtimeMs"))
-              .append(",")
-              .append(getKV(map, "numResults"))
-              .append(" ")
-              .append(map.get("timestamp"))
+          String measurement = checkArgumentNotNull(map.get("measurement")).toString();
+          addMeasurement(builder, measurement, tags, filterKeys(map, fields), map.get("timestamp"))
               .append('\n');
         });
+    return builder.toString();
+  }
 
-    postRequest.setEntity(
-        new GzipCompressingEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))));
-
-    executeWithVerification(postRequest, builder);
+  @SuppressWarnings("nullness")
+  private static <K, V> Map<K, V> filterKeys(final Map<K, V> map, final Set<K> keys) {
+    return Maps.filterKeys(map, keys::contains);
   }
 
-  private static String getKV(final Map<String, Object> map, final String key) {
-    return key + "=" + map.get(key);
+  // fix types once nexmarkMeasurements is removed
+  private static StringBuilder addMeasurement(
+      StringBuilder builder,
+      String measurement,
+      Map<String, ?> tags,
+      Map<String, ?> fields,
+      @Nullable Object timestamp) {
+    checkState(!fields.isEmpty(), "fields cannot be empty");
+    builder.append(measurement);
+    tags.forEach((k, v) -> builder.append(',').append(k).append('=').append(v));
+    builder.append(' ');
+    fields.forEach((k, v) -> builder.append(k).append('=').append(fieldValue(v)).append(','));
+    builder.setLength(builder.length() - 1); // skip last comma
+    if (timestamp != null) {
+      builder.append(' ').append(timestamp);

Review Comment:
   Based upon the specification, this should be nanos but I believe this timestamp is millis based upon my reading of the code.
   
   The spec is:
   ```
   measurement(,tag_key=tag_val)* field_key=field_val(,field_key_n=field_value_n)* (nanoseconds-timestamp)?
   ```



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java:
##########
@@ -85,4 +85,17 @@ public String getMetric() {
   public double getValue() {
     return value;
   }
+
+  public Map<String, String> tags() {
+    return ImmutableMap.of("test_id", testId, "metric", metric);
+  }
+
+  public Map<String, Number> fields() {
+    return ImmutableMap.of("value", value);
+  }
+
+  /** Convert this result to InfluxDB data point. */
+  public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) {
+    return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null);

Review Comment:
   It seems like we are ignoring the timestamp that is part of this test. Should we be passing it into the datapoint?



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -46,35 +56,80 @@
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public final class InfluxDBPublisher {
   private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
 
   private InfluxDBPublisher() {}
 
+  /** InfluxDB data point. */
+  @AutoValue
+  public abstract static class DataPoint {
+    DataPoint() {}
+
+    public abstract @Pure String measurement();
+
+    public abstract @Pure Map<String, String> tags();
+
+    public abstract @Pure Map<String, Number> fields();
+
+    @Nullable
+    public abstract @Pure Long timestamp();

Review Comment:
   What unit is this timestamp in?
   
   Consider naming it like timestampMillis, timestampNanos or using a date time type that we then convert in `addMeasurement` to nanos.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r957067841


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -83,73 +138,48 @@ private static void publishWithCheck(
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  @VisibleForTesting
+  static String nexmarkDataPoints(

Review Comment:
   This is part of the legacy interface and to be removed, I marked it as deprecated and added a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche merged pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche merged PR #22260:
URL: https://github.com/apache/beam/pull/22260


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1212121643

   Run Spark Runner Nexmark Tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1212161163

   @mosche Please, add a reference to related github issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r957092728


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java:
##########
@@ -85,4 +85,17 @@ public String getMetric() {
   public double getValue() {
     return value;
   }
+
+  public Map<String, String> tags() {
+    return ImmutableMap.of("test_id", testId, "metric", metric);
+  }
+
+  public Map<String, Number> fields() {
+    return ImmutableMap.of("value", value);
+  }
+
+  /** Convert this result to InfluxDB data point. */
+  public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) {
+    return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null);

Review Comment:
   At the moment `timestamp` of `NamedTestResult` is ignored when writing to InfluxDB. TBH I'm not sure why, maybe it's incompatible with InfluxDB? For now I'd prefer to keep that as is to not introduce any potential problems.
   
   > timestamp: Time at which this result was sampled. Should be in a BigQuery supported timestamp format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r943610732


##########
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java:
##########
@@ -208,38 +195,24 @@ private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options)
         .get();
   }
 
-  private static String produceMeasurement(
-      final NexmarkOptions options, Map.Entry<NexmarkConfiguration, NexmarkPerf> entry) {
+  private static String measurementName(final NexmarkOptions options, NexmarkConfiguration config) {

Review Comment:
   nit: `generateMeasurementName()`



##########
sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisherTest.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+public final class InfluxDBPublisherTest {
+
+  @Test
+  public void testNexmarkDataPoints() {
+    Map<String, Object> measurement =
+        ImmutableMap.<String, Object>builder()
+            .put("measurement", "name")
+            .put("timestamp", 9999L)
+            .put("runtimeMs", "1000i")
+            .put("numResults", "10i")
+            .build();
+    List<Map<String, Object>> measurements =
+        ImmutableList.of(measurement, measurement, measurement);

Review Comment:
   nit: create a list of measurements with different values



##########
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java:
##########
@@ -208,38 +195,24 @@ private static InfluxDBSettings getInfluxSettings(final NexmarkOptions options)
         .get();
   }
 
-  private static String produceMeasurement(
-      final NexmarkOptions options, Map.Entry<NexmarkConfiguration, NexmarkPerf> entry) {
+  private static String measurementName(final NexmarkOptions options, NexmarkConfiguration config) {
     final String queryName =
-        NexmarkUtils.fullQueryName(
-            options.getQueryLanguage(), entry.getKey().query.getNumberOrName());
+        NexmarkUtils.fullQueryName(options.getQueryLanguage(), config.query.getNumberOrName());
     return String.format(
         "%s_%s_%s",
         options.getBaseInfluxMeasurement(), queryName, processingMode(options.isStreaming()));
   }
 
-  private static Map<String, Object> getResultsFromSchema(
-      final NexmarkPerf results,
-      final Map<String, String> schema,
-      final long timestamp,
-      final String runner,
-      final String measurement) {
-    final Map<String, Object> schemaResults =
-        results.toMap().entrySet().stream()
-            .filter(element -> schema.containsKey(element.getKey()))
-            .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
-    final int runtimeMs =
-        (int) ((double) schemaResults.get("runtimeSec") * 1000); // change sec to ms
-    schemaResults.put("timestamp", timestamp);
-    schemaResults.put("runner", runner);
-    schemaResults.put("measurement", measurement);
-
-    // By default, InfluxDB treats all number values as floats. We need to add 'i' suffix to
-    // interpret the value as an integer.
-    schemaResults.put("runtimeMs", runtimeMs + "i");
-    schemaResults.put("numResults", schemaResults.get("numResults") + "i");
-
-    return schemaResults;
+  private static InfluxDBPublisher.DataPoint influxDBDataPoint(

Review Comment:
   nit: `createInfluxDBDataPoint()`



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -83,73 +138,48 @@ private static void publishWithCheck(
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  @VisibleForTesting
+  static String nexmarkDataPoints(

Review Comment:
   Should we extract this method into specific Nexmark class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1210767449

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1190095360

   Thanks for having a look @lukecwik! Have a great time off :)
   
   I've had a look at that plugin before. The reason why I'd be a bit reluctant to use the plugin is the lack of configuration. It uses the simple name of a benchmark class as measurement name. We might risk collision that way, especially across modules if we keep using a single db. But of course, we could start using multiple dbs with a db per JMH module. 
   
   Another small point is that all JMH params + the test name are treated as fields (instead of tags) making these just one series per benchmark class where, in fact, each combination is - at least logically - a separate series of data. Though, that's certainly more cosmetic and probably done to keep series cardinality low. Also, from a query perspective it won't be very important as data points will be very infrequent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r957097582


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -46,35 +56,80 @@
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public final class InfluxDBPublisher {
   private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
 
   private InfluxDBPublisher() {}
 
+  /** InfluxDB data point. */
+  @AutoValue
+  public abstract static class DataPoint {
+    DataPoint() {}
+
+    public abstract @Pure String measurement();
+
+    public abstract @Pure Map<String, String> tags();
+
+    public abstract @Pure Map<String, Number> fields();
+
+    @Nullable
+    public abstract @Pure Long timestamp();

Review Comment:
   I renamed it to timestampSecs and added a comment to explain. Also, see below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22260:
URL: https://github.com/apache/beam/pull/22260#issuecomment-1230083100

   @aromanenko-dev I've addressed the comments and pushed some changes to unblock your PR. Please merge when tests are green.
   Ticket is https://github.com/apache/beam/issues/22238 as mentioned in the description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org