You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/05/07 08:33:44 UTC

[beam] branch master updated: [BEAM-8133] Push ioit tests metrics to influxdb

This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81cc7eb  [BEAM-8133] Push ioit tests metrics to influxdb
81cc7eb is described below

commit 81cc7eb249e759337630b51f29616483929a57b6
Author: pawelpasterz <32...@users.noreply.github.com>
AuthorDate: Thu May 7 10:33:30 2020 +0200

    [BEAM-8133] Push ioit tests metrics to influxdb
---
 .../provider/bigquery/BigQueryIOPushDownIT.java    |   9 ++
 .../beam/sdk/bigqueryioperftests/BigQueryIOIT.java |  20 ++--
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |  18 ++++
 .../java/org/apache/beam/sdk/io/avro/AvroIOIT.java |  14 ++-
 .../apache/beam/sdk/io/parquet/ParquetIOIT.java    |  14 ++-
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  14 ++-
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  |  11 ++-
 .../java/org/apache/beam/sdk/io/xml/XmlIOIT.java   |  14 ++-
 .../sdk/io/hadoop/format/HadoopFormatIOIT.java     |  10 ++
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java |  10 ++
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java    |  13 ++-
 .../apache/beam/sdk/io/mongodb/MongoDBIOIT.java    |  10 ++
 .../org/apache/beam/sdk/loadtests/LoadTest.java    |  48 ++++++++--
 .../apache/beam/sdk/loadtests/LoadTestOptions.java |  24 +++++
 .../beam/sdk/testutils/metrics/IOITMetrics.java    |  28 ++++--
 .../testutils/publishing/InfluxDBPublisher.java    | 103 +++++++++++++++++++++
 .../sdk/testutils/publishing/InfluxDBSettings.java |  79 ++++++++++++++++
 17 files changed, 407 insertions(+), 32 deletions(-)

diff --git a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
index caa5497..3da48a2 100644
--- a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
+++ b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
@@ -89,6 +90,7 @@ public class BigQueryIOPushDownIT {
   private static SQLBigQueryPerfTestOptions options;
   private static String metricsBigQueryDataset;
   private static String metricsBigQueryTable;
+  private static InfluxDBSettings settings;
   private Pipeline pipeline = Pipeline.create(options);
   private BeamSqlEnv sqlEnv;
 
@@ -97,6 +99,12 @@ public class BigQueryIOPushDownIT {
     options = IOITHelper.readIOTestPipelineOptions(SQLBigQueryPerfTestOptions.class);
     metricsBigQueryDataset = options.getMetricsBigQueryDataset();
     metricsBigQueryTable = options.getMetricsBigQueryTable();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @Before
@@ -169,6 +177,7 @@ public class BigQueryIOPushDownIT {
     IOITMetrics readMetrics =
         new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp);
     readMetrics.publish(metricsBigQueryDataset, metricsBigQueryTable + postfix);
+    readMetrics.publishToInflux(settings.copyWithMeasurement(settings.measurement + postfix));
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index a12dd41..41cad8d 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -27,6 +27,7 @@ import com.google.cloud.bigquery.TableId;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.function.Function;
 import org.apache.avro.generic.GenericData;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -92,6 +94,7 @@ public class BigQueryIOIT {
   private static String tempRoot;
   private static BigQueryPerfTestOptions options;
   private static WriteFormat writeFormat;
+  private static InfluxDBSettings settings;
 
   @BeforeClass
   public static void setup() throws IOException {
@@ -108,6 +111,12 @@ public class BigQueryIOIT {
     tableQualifier =
         String.format(
             "%s:%s.%s", bigQueryOptions.getProjectId(), testBigQueryDataset, testBigQueryTable);
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @AfterClass
@@ -193,14 +202,11 @@ public class BigQueryIOIT {
   }
 
   private void extractAndPublishTime(PipelineResult pipelineResult, String writeTimeMetricName) {
-    NamedTestResult metricResult =
+    final NamedTestResult metricResult =
         getMetricSupplier(writeTimeMetricName).apply(new MetricsReader(pipelineResult, NAMESPACE));
-    IOITMetrics.publish(
-        TEST_ID,
-        TEST_TIMESTAMP,
-        metricsBigQueryDataset,
-        metricsBigQueryTable,
-        Collections.singletonList(metricResult));
+    final List<NamedTestResult> listResults = Collections.singletonList(metricResult);
+    IOITMetrics.publish(metricsBigQueryDataset, metricsBigQueryTable, listResults);
+    IOITMetrics.publishToInflux(TEST_ID, TEST_TIMESTAMP, listResults, settings);
   }
 
   private static Function<MetricsReader, NamedTestResult> getMetricSupplier(String metricName) {
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 4735ade..058dbbd 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -43,4 +43,22 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   String getBigQueryTable();
 
   void setBigQueryTable(@Nullable String tableName);
+
+  @Description("InfluxDB measurement to publish results to.")
+  @Nullable
+  String getInfluxMeasurement();
+
+  void setInfluxMeasurement(@Nullable String measurement);
+
+  @Description("InfluxDB host.")
+  @Nullable
+  String getInfluxHost();
+
+  void setInfluxHost(@Nullable String host);
+
+  @Description("InfluxDB database.")
+  @Nullable
+  String getInfluxDatabase();
+
+  void setInfluxDatabase(@Nullable String database);
 }
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index 0b78688..0d48480 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -98,6 +99,7 @@ public class AvroIOIT {
   private static Integer numberOfTextLines;
   private static Integer datasetSize;
   private static String expectedHash;
+  private static InfluxDBSettings settings;
 
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
@@ -111,6 +113,12 @@ public class AvroIOIT {
     datasetSize = options.getDatasetSize();
     expectedHash = options.getExpectedHash();
     numberOfTextLines = options.getNumberOfRecords();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @Test
@@ -164,8 +172,10 @@ public class AvroIOIT {
 
     Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
         fillMetricSuppliers(uuid, timestamp);
-    new IOITMetrics(metricSuppliers, result, AVRO_NAMESPACE, uuid, timestamp)
-        .publish(bigQueryDataset, bigQueryTable);
+    final IOITMetrics metrics =
+        new IOITMetrics(metricSuppliers, result, AVRO_NAMESPACE, uuid, timestamp);
+    metrics.publish(bigQueryDataset, bigQueryTable);
+    metrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
index 90af13c..7a36ac8 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -97,6 +98,7 @@ public class ParquetIOIT {
   private static Integer numberOfTextLines;
   private static Integer datasetSize;
   private static String expectedHash;
+  private static InfluxDBSettings settings;
 
   @Rule public TestPipeline pipeline = TestPipeline.create();
   private static final String PARQUET_NAMESPACE = ParquetIOIT.class.getName();
@@ -110,6 +112,12 @@ public class ParquetIOIT {
     filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
     bigQueryDataset = options.getBigQueryDataset();
     bigQueryTable = options.getBigQueryTable();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @Test
@@ -169,8 +177,10 @@ public class ParquetIOIT {
     String timestamp = Timestamp.now().toString();
     Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
         fillMetricSuppliers(uuid, timestamp);
-    new IOITMetrics(metricSuppliers, result, PARQUET_NAMESPACE, uuid, timestamp)
-        .publish(bigQueryDataset, bigQueryTable);
+    final IOITMetrics metrics =
+        new IOITMetrics(metricSuppliers, result, PARQUET_NAMESPACE, uuid, timestamp);
+    metrics.publish(bigQueryDataset, bigQueryTable);
+    metrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 5625a28..c28bd93 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
@@ -88,6 +89,7 @@ public class TextIOIT {
   private static String bigQueryDataset;
   private static String bigQueryTable;
   private static boolean gatherGcsPerformanceMetrics;
+  private static InfluxDBSettings settings;
   private static final String FILEIOIT_NAMESPACE = TextIOIT.class.getName();
 
   @Rule public TestPipeline pipeline = TestPipeline.create();
@@ -104,6 +106,12 @@ public class TextIOIT {
     bigQueryDataset = options.getBigQueryDataset();
     bigQueryTable = options.getBigQueryTable();
     gatherGcsPerformanceMetrics = options.getReportGcsPerformanceMetrics();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @Test
@@ -161,8 +169,10 @@ public class TextIOIT {
     Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
         fillMetricSuppliers(uuid, timestamp.toString());
 
-    new IOITMetrics(metricSuppliers, result, FILEIOIT_NAMESPACE, uuid, timestamp.toString())
-        .publish(bigQueryDataset, bigQueryTable);
+    final IOITMetrics metrics =
+        new IOITMetrics(metricSuppliers, result, FILEIOIT_NAMESPACE, uuid, timestamp.toString());
+    metrics.publish(bigQueryDataset, bigQueryTable);
+    metrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 5473926..395000b 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -93,6 +94,7 @@ public class TFRecordIOIT {
   private static Integer datasetSize;
   private static String expectedHash;
   private static Compression compressionType;
+  private static InfluxDBSettings settings;
 
   @Rule public TestPipeline writePipeline = TestPipeline.create();
 
@@ -108,6 +110,12 @@ public class TFRecordIOIT {
     filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
     bigQueryDataset = options.getBigQueryDataset();
     bigQueryTable = options.getBigQueryTable();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   private static String createFilenamePattern() {
@@ -176,7 +184,8 @@ public class TFRecordIOIT {
         MetricsReader.ofResults(readResults, TFRECORD_NAMESPACE)
             .readAll(getReadMetricSuppliers(uuid, timestamp)));
 
-    IOITMetrics.publish(uuid, timestamp, bigQueryDataset, bigQueryTable, results);
+    IOITMetrics.publish(bigQueryDataset, bigQueryTable, results);
+    IOITMetrics.publishToInflux(uuid, timestamp, results, settings);
   }
 
   private static Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index 5d8163e..c4884cc 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -100,6 +101,7 @@ public class XmlIOIT {
   private static final String XMLIOIT_NAMESPACE = XmlIOIT.class.getName();
 
   private static Charset charset;
+  private static InfluxDBSettings settings;
 
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
@@ -113,6 +115,12 @@ public class XmlIOIT {
     datasetSize = options.getDatasetSize();
     expectedHash = options.getExpectedHash();
     numberOfTextLines = options.getNumberOfRecords();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @Test
@@ -176,8 +184,10 @@ public class XmlIOIT {
 
     Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
         fillMetricSuppliers(uuid, timestamp);
-    new IOITMetrics(metricSuppliers, result, XMLIOIT_NAMESPACE, uuid, timestamp)
-        .publish(bigQueryDataset, bigQueryTable);
+    final IOITMetrics metrics =
+        new IOITMetrics(metricSuppliers, result, XMLIOIT_NAMESPACE, uuid, timestamp);
+    metrics.publish(bigQueryDataset, bigQueryTable);
+    metrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
index 1a693ba..cbfbb28 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -99,6 +100,7 @@ public class HadoopFormatIOIT {
   private static SerializableConfiguration hadoopConfiguration;
   private static String bigQueryDataset;
   private static String bigQueryTable;
+  private static InfluxDBSettings settings;
 
   @Rule public TestPipeline writePipeline = TestPipeline.create();
   @Rule public TestPipeline readPipeline = TestPipeline.create();
@@ -114,6 +116,12 @@ public class HadoopFormatIOIT {
     tableName = DatabaseTestHelper.getTestTableName("HadoopFormatIOIT");
     bigQueryDataset = options.getBigQueryDataset();
     bigQueryTable = options.getBigQueryTable();
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
 
     executeWithRetry(HadoopFormatIOIT::createTable);
     setupHadoopConfiguration(options);
@@ -214,7 +222,9 @@ public class HadoopFormatIOIT {
     IOITMetrics writeMetrics =
         new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
     readMetrics.publish(bigQueryDataset, bigQueryTable);
+    readMetrics.publishToInflux(settings);
     writeMetrics.publish(bigQueryDataset, bigQueryTable);
+    writeMetrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 81b9799..6c80002 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -85,6 +86,7 @@ public class JdbcIOIT {
   private static String bigQueryDataset;
   private static String bigQueryTable;
   private static Long tableSize;
+  private static InfluxDBSettings settings;
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
@@ -100,6 +102,12 @@ public class JdbcIOIT {
     tableName = DatabaseTestHelper.getTestTableName("IT");
     executeWithRetry(JdbcIOIT::createTable);
     tableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   private static void createTable() throws SQLException {
@@ -134,11 +142,13 @@ public class JdbcIOIT {
     IOITMetrics writeMetrics =
         new IOITMetrics(metricSuppliers, writeResult, NAMESPACE, uuid, timestamp);
     writeMetrics.publish(bigQueryDataset, bigQueryTable);
+    writeMetrics.publishToInflux(settings);
 
     IOITMetrics readMetrics =
         new IOITMetrics(
             getReadMetricSuppliers(uuid, timestamp), readResult, NAMESPACE, uuid, timestamp);
     readMetrics.publish(bigQueryDataset, bigQueryTable);
+    readMetrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 012c93b..feb62c1 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -87,6 +88,8 @@ public class KafkaIOIT {
 
   private static Options options;
 
+  private static InfluxDBSettings settings;
+
   @Rule public TestPipeline writePipeline = TestPipeline.create();
 
   @Rule public TestPipeline readPipeline = TestPipeline.create();
@@ -101,6 +104,12 @@ public class KafkaIOIT {
             1000L, "4507649971ee7c51abbb446e65a5c660",
             100_000_000L, "0f12c27c9a7672e14775594be66cad9a");
     expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, expectedHashes);
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @Test
@@ -130,8 +139,8 @@ public class KafkaIOIT {
     cancelIfTimeouted(readResult, readState);
 
     Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
-    IOITMetrics.publish(
-        TEST_ID, TIMESTAMP, options.getBigQueryDataset(), options.getBigQueryTable(), metrics);
+    IOITMetrics.publish(options.getBigQueryDataset(), options.getBigQueryTable(), metrics);
+    IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
   }
 
   private Set<NamedTestResult> readMetrics(PipelineResult writeResult, PipelineResult readResult) {
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
index 121f538..086d9a6 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -84,6 +85,7 @@ public class MongoDBIOIT {
   private static String bigQueryTable;
   private static String mongoUrl;
   private static MongoClient mongoClient;
+  private static InfluxDBSettings settings;
 
   private double initialCollectionSize;
   private double finalCollectionSize;
@@ -131,6 +133,12 @@ public class MongoDBIOIT {
     mongoUrl =
         String.format("mongodb://%s:%s", options.getMongoDBHostName(), options.getMongoDBPort());
     mongoClient = MongoClients.create(mongoUrl);
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
   }
 
   @After
@@ -208,7 +216,9 @@ public class MongoDBIOIT {
     IOITMetrics writeMetrics =
         new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
     readMetrics.publish(bigQueryDataset, bigQueryTable);
+    readMetrics.publishToInflux(settings);
     writeMetrics.publish(bigQueryDataset, bigQueryTable);
+    writeMetrics.publishToInflux(settings);
   }
 
   private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index 40a172c..9dff968 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -39,6 +41,8 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
 import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
 import org.apache.beam.sdk.testutils.publishing.ConsoleResultPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -49,6 +53,8 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base class for all load tests. Provides common operations such as initializing source/step
@@ -56,7 +62,9 @@ import org.joda.time.Duration;
  */
 abstract class LoadTest<OptionsT extends LoadTestOptions> {
 
-  private String metricsNamespace;
+  private static final Logger LOG = LoggerFactory.getLogger(LoadTest.class);
+
+  private final String metricsNamespace;
 
   protected TimeMonitor<KV<byte[], byte[]>> runtimeMonitor;
 
@@ -66,13 +74,33 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
 
   protected Pipeline pipeline;
 
+  private final String runner;
+
+  private final InfluxDBSettings settings;
+
   LoadTest(String[] args, Class<OptionsT> testOptions, String metricsNamespace) throws IOException {
     this.metricsNamespace = metricsNamespace;
     this.runtimeMonitor = new TimeMonitor<>(metricsNamespace, "runtime");
     this.options = LoadTestOptions.readFromArgs(args, testOptions);
     this.sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
-
     this.pipeline = Pipeline.create(options);
+    this.runner = getRunnerName(options.getRunner().getName());
+    settings =
+        InfluxDBSettings.builder()
+            .withHost(options.getInfluxHost())
+            .withDatabase(options.getInfluxDatabase())
+            .withMeasurement(options.getInfluxMeasurement())
+            .get();
+  }
+
+  private static String getRunnerName(final String runnerName) {
+    final Matcher matcher = Pattern.compile("((.*)\\.)?(.*?)Runner").matcher(runnerName);
+    if (matcher.matches()) {
+      return matcher.group(3).toLowerCase() + "_";
+    } else {
+      LOG.warn("Unable to get runner name, no prefix used for metrics");
+      return "";
+    }
   }
 
   PTransform<PBegin, PCollection<KV<byte[], byte[]>>> readFromSource(
@@ -91,15 +119,15 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
    * Runs the load test, collects and publishes test results to various data store and/or console.
    */
   public PipelineResult run() throws IOException {
-    Timestamp timestamp = Timestamp.now();
+    final Timestamp timestamp = Timestamp.now();
 
     loadTest();
 
-    PipelineResult pipelineResult = pipeline.run();
+    final PipelineResult pipelineResult = pipeline.run();
     pipelineResult.waitUntilFinish(Duration.standardMinutes(options.getLoadTestTimeout()));
 
-    String testId = UUID.randomUUID().toString();
-    List metrics = readMetrics(timestamp, pipelineResult, testId);
+    final String testId = UUID.randomUUID().toString();
+    final List<NamedTestResult> metrics = readMetrics(timestamp, pipelineResult, testId);
 
     ConsoleResultPublisher.publish(metrics, testId, timestamp.toString());
 
@@ -109,6 +137,10 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
       publishResultsToBigQuery(metrics);
     }
 
+    if (options.getPublishToInfluxDB()) {
+      InfluxDBPublisher.publishWithSettings(metrics, settings);
+    }
+
     return pipelineResult;
   }
 
@@ -120,14 +152,14 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
         NamedTestResult.create(
             testId,
             timestamp.toString(),
-            "runtime_sec",
+            runner + "runtime_sec",
             (reader.getEndTimeMetric("runtime") - reader.getStartTimeMetric("runtime")) / 1000D);
 
     NamedTestResult totalBytes =
         NamedTestResult.create(
             testId,
             timestamp.toString(),
-            "total_bytes_count",
+            runner + "total_bytes_count",
             reader.getCounterMetric("totalBytes.count"));
 
     return Arrays.asList(runtime, totalBytes);
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
index 3353e43..71da5f4 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
@@ -68,6 +68,30 @@ public interface LoadTestOptions extends PipelineOptions, ApplicationNameOptions
 
   void setInputWindowDurationSec(Long windowSizeSec);
 
+  @Description("InfluxDB measurement to publish results to.")
+  @Nullable
+  String getInfluxMeasurement();
+
+  void setInfluxMeasurement(@Nullable String measurement);
+
+  @Description("InfluxDB host.")
+  @Nullable
+  String getInfluxHost();
+
+  void setInfluxHost(@Nullable String host);
+
+  @Description("InfluxDB database.")
+  @Nullable
+  String getInfluxDatabase();
+
+  void setInfluxDatabase(@Nullable String database);
+
+  @Description("Whether the results should be published to InfluxDB")
+  @Default.Boolean(false)
+  Boolean getPublishToInfluxDB();
+
+  void setPublishToInfluxDB(Boolean publishToInfluxDB);
+
   static <T extends LoadTestOptions> T readFromArgs(String[] args, Class<T> optionsClass) {
     return PipelineOptionsFactory.fromArgs(args).withValidation().as(optionsClass);
   }
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java
index f7b1559..05482c3 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
 import org.apache.beam.sdk.testutils.publishing.ConsoleResultPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 
 /**
  * Contains a flexible mechanism of publishing metrics to BQ and console using suppliers provided in
@@ -54,20 +56,34 @@ public class IOITMetrics {
     MetricsReader reader = new MetricsReader(result, namespace);
     Collection<NamedTestResult> namedTestResults = reader.readAll(metricSuppliers);
 
-    publish(uuid, timestamp, bigQueryDataset, bigQueryTable, namedTestResults);
+    publish(bigQueryDataset, bigQueryTable, namedTestResults);
   }
 
   public static void publish(
-      String uuid,
-      String timestamp,
-      String bigQueryDataset,
-      String bigQueryTable,
-      Collection<NamedTestResult> results) {
+      final String bigQueryDataset,
+      final String bigQueryTable,
+      final Collection<NamedTestResult> results) {
 
     if (bigQueryDataset != null && bigQueryTable != null) {
       BigQueryResultsPublisher.create(bigQueryDataset, NamedTestResult.getSchema())
           .publish(results, bigQueryTable);
     }
+  }
+
+  public void publishToInflux(final InfluxDBSettings settings) {
+    MetricsReader reader = new MetricsReader(result, namespace);
+    Collection<NamedTestResult> namedTestResults = reader.readAll(metricSuppliers);
+
+    publishToInflux(uuid, timestamp, namedTestResults, settings);
+  }
+
+  public static void publishToInflux(
+      final String uuid,
+      final String timestamp,
+      final Collection<NamedTestResult> results,
+      final InfluxDBSettings settings) {
+
     ConsoleResultPublisher.publish(results, uuid, timestamp);
+    InfluxDBPublisher.publishWithSettings(results, settings);
   }
 }
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
new file mode 100644
index 0000000..1fe8ada
--- /dev/null
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils.isNoneBlank;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class InfluxDBPublisher {
+  private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
+
+  private InfluxDBPublisher() {}
+
+  public static void publishWithSettings(
+      final Collection<NamedTestResult> results, final InfluxDBSettings settings) {
+    requireNonNull(settings, "InfluxDB settings must not be null");
+    if (isNoneBlank(settings.measurement, settings.database)) {
+      try {
+        publish(results, settings);
+      } catch (final Exception exception) {
+        LOG.warn("Unable to publish metrics due to error: {}", exception.getMessage(), exception);
+      }
+    } else {
+      LOG.warn("Missing property -- measurement/database. Metrics won't be published.");
+    }
+  }
+
+  private static void publish(
+      final Collection<NamedTestResult> results, final InfluxDBSettings settings) throws Exception {
+
+    final HttpClientBuilder builder = HttpClientBuilder.create();
+
+    if (isNoneBlank(settings.userName, settings.userPassword)) {
+      final CredentialsProvider provider = new BasicCredentialsProvider();
+      provider.setCredentials(
+          AuthScope.ANY, new UsernamePasswordCredentials(settings.userName, settings.userPassword));
+      builder.setDefaultCredentialsProvider(provider);
+    }
+
+    final HttpPost postRequest = new HttpPost(settings.host + "write?db=" + settings.database);
+
+    final StringBuilder metricBuilder = new StringBuilder();
+    results.stream()
+        .map(NamedTestResult::toMap)
+        .forEach(
+            map ->
+                metricBuilder
+                    .append(settings.measurement)
+                    .append(",")
+                    .append("test_id")
+                    .append("=")
+                    .append(map.get("test_id"))
+                    .append(",")
+                    .append("metric")
+                    .append("=")
+                    .append(map.get("metric"))
+                    .append(" ")
+                    .append("value")
+                    .append("=")
+                    .append(map.get("value"))
+                    .append('\n'));
+
+    postRequest.setEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8)));
+    try (final CloseableHttpResponse response = builder.build().execute(postRequest)) {
+      is2xx(response.getStatusLine().getStatusCode());
+    }
+  }
+
+  private static void is2xx(final int code) throws IOException {
+    if (code < 200 || code >= 300) {
+      throw new IOException("Response code: " + code);
+    }
+  }
+}
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBSettings.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBSettings.java
new file mode 100644
index 0000000..927f542
--- /dev/null
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBSettings.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils.isBlank;
+
+public final class InfluxDBSettings {
+
+  public final String host;
+  public final String userName;
+  public final String userPassword;
+  public final String measurement;
+  public final String database;
+
+  private InfluxDBSettings(
+      String host, String userName, String userPassword, String measurement, String database) {
+    this.host = host;
+    this.userName = userName;
+    this.userPassword = userPassword;
+    this.measurement = measurement;
+    this.database = database;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public InfluxDBSettings copyWithMeasurement(final String newMeasurement) {
+    return new InfluxDBSettings(host, userName, userPassword, newMeasurement, database);
+  }
+
+  public static class Builder {
+    private static final String DEFAULT_HOST = "http://localhost:8086/";
+    private static final String INFLUX_USER = "INFLUXDB_USER";
+    private static final String INFLUX_PASSWORD = "INFLUXDB_USER_PASSWORD";
+
+    private String host;
+    private String measurement;
+    private String database;
+
+    public Builder withHost(final String host) {
+      this.host = host;
+      return this;
+    }
+
+    public Builder withMeasurement(final String measurement) {
+      this.measurement = measurement;
+      return this;
+    }
+
+    public Builder withDatabase(final String database) {
+      this.database = database;
+      return this;
+    }
+
+    public InfluxDBSettings get() {
+      final String userName = System.getenv(INFLUX_USER);
+      final String userPassword = System.getenv(INFLUX_PASSWORD);
+      final String influxHost = isBlank(host) ? DEFAULT_HOST : host;
+
+      return new InfluxDBSettings(influxHost, userName, userPassword, measurement, database);
+    }
+  }
+}