You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/11 17:40:04 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #22260: Generalize interface of InfluxDBPublisher to support more use cases (test-utils)

lukecwik commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r943748046


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -83,73 +138,48 @@ private static void publishWithCheck(
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  @VisibleForTesting
+  static String nexmarkDataPoints(
+      final Collection<Map<String, Object>> results, final Map<String, String> tags) {
+    final StringBuilder builder = new StringBuilder();
+    final Set<String> fields = ImmutableSet.of("runtimeMs", "numResults");
     results.forEach(
         map -> {
-          metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, "runner"));
-          if (tags != null && !tags.isEmpty()) {
-            tags.entrySet().stream()
-                .forEach(
-                    entry -> {
-                      metricBuilder
-                          .append(",")
-                          .append(entry.getKey())
-                          .append("=")
-                          .append(entry.getValue());
-                    });
-          }
-          metricBuilder
-              .append(" ")
-              .append(getKV(map, "runtimeMs"))
-              .append(",")
-              .append(getKV(map, "numResults"))
-              .append(" ")
-              .append(map.get("timestamp"))
+          String measurement = checkArgumentNotNull(map.get("measurement")).toString();
+          addMeasurement(builder, measurement, tags, filterKeys(map, fields), map.get("timestamp"))
               .append('\n');
         });
+    return builder.toString();
+  }
 
-    postRequest.setEntity(
-        new GzipCompressingEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))));
-
-    executeWithVerification(postRequest, builder);
+  @SuppressWarnings("nullness")
+  private static <K, V> Map<K, V> filterKeys(final Map<K, V> map, final Set<K> keys) {
+    return Maps.filterKeys(map, keys::contains);
   }
 
-  private static String getKV(final Map<String, Object> map, final String key) {
-    return key + "=" + map.get(key);
+  // fix types once nexmarkMeasurements is removed
+  private static StringBuilder addMeasurement(
+      StringBuilder builder,
+      String measurement,
+      Map<String, ?> tags,
+      Map<String, ?> fields,
+      @Nullable Object timestamp) {
+    checkState(!fields.isEmpty(), "fields cannot be empty");
+    builder.append(measurement);
+    tags.forEach((k, v) -> builder.append(',').append(k).append('=').append(v));
+    builder.append(' ');
+    fields.forEach((k, v) -> builder.append(k).append('=').append(fieldValue(v)).append(','));
+    builder.setLength(builder.length() - 1); // skip last comma
+    if (timestamp != null) {
+      builder.append(' ').append(timestamp);

Review Comment:
   Based upon the specification, this should be nanos but I believe this timestamp is millis based upon my reading of the code.
   
   The spec is:
   ```
   measurement(,tag_key=tag_val)* field_key=field_val(,field_key_n=field_value_n)* (nanoseconds-timestamp)?
   ```



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/NamedTestResult.java:
##########
@@ -85,4 +85,17 @@ public String getMetric() {
   public double getValue() {
     return value;
   }
+
+  public Map<String, String> tags() {
+    return ImmutableMap.of("test_id", testId, "metric", metric);
+  }
+
+  public Map<String, Number> fields() {
+    return ImmutableMap.of("value", value);
+  }
+
+  /** Convert this result to InfluxDB data point. */
+  public InfluxDBPublisher.DataPoint toInfluxDBDataPoint(String measurement) {
+    return InfluxDBPublisher.dataPoint(measurement, tags(), fields(), null);

Review Comment:
   It seems like we are ignoring the timestamp that is part of this test. Should we be passing it into the datapoint?



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -46,35 +56,80 @@
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public final class InfluxDBPublisher {
   private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
 
   private InfluxDBPublisher() {}
 
+  /** InfluxDB data point. */
+  @AutoValue
+  public abstract static class DataPoint {
+    DataPoint() {}
+
+    public abstract @Pure String measurement();
+
+    public abstract @Pure Map<String, String> tags();
+
+    public abstract @Pure Map<String, Number> fields();
+
+    @Nullable
+    public abstract @Pure Long timestamp();

Review Comment:
   What unit is this timestamp in?
   
   Consider naming it like timestampMillis, timestampNanos or using a date time type that we then convert in `addMeasurement` to nanos.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org