You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/05/07 08:33:44 UTC
[beam] branch master updated: [BEAM-8133] Push ioit tests metrics
to influxdb
This is an automated email from the ASF dual-hosted git repository.
kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 81cc7eb [BEAM-8133] Push ioit tests metrics to influxdb
81cc7eb is described below
commit 81cc7eb249e759337630b51f29616483929a57b6
Author: pawelpasterz <32...@users.noreply.github.com>
AuthorDate: Thu May 7 10:33:30 2020 +0200
[BEAM-8133] Push ioit tests metrics to influxdb
---
.../provider/bigquery/BigQueryIOPushDownIT.java | 9 ++
.../beam/sdk/bigqueryioperftests/BigQueryIOIT.java | 20 ++--
.../beam/sdk/io/common/IOTestPipelineOptions.java | 18 ++++
.../java/org/apache/beam/sdk/io/avro/AvroIOIT.java | 14 ++-
.../apache/beam/sdk/io/parquet/ParquetIOIT.java | 14 ++-
.../java/org/apache/beam/sdk/io/text/TextIOIT.java | 14 ++-
.../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java | 11 ++-
.../java/org/apache/beam/sdk/io/xml/XmlIOIT.java | 14 ++-
.../sdk/io/hadoop/format/HadoopFormatIOIT.java | 10 ++
.../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 10 ++
.../org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 13 ++-
.../apache/beam/sdk/io/mongodb/MongoDBIOIT.java | 10 ++
.../org/apache/beam/sdk/loadtests/LoadTest.java | 48 ++++++++--
.../apache/beam/sdk/loadtests/LoadTestOptions.java | 24 +++++
.../beam/sdk/testutils/metrics/IOITMetrics.java | 28 ++++--
.../testutils/publishing/InfluxDBPublisher.java | 103 +++++++++++++++++++++
.../sdk/testutils/publishing/InfluxDBSettings.java | 79 ++++++++++++++++
17 files changed, 407 insertions(+), 32 deletions(-)
diff --git a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
index caa5497..3da48a2 100644
--- a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
+++ b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
@@ -89,6 +90,7 @@ public class BigQueryIOPushDownIT {
private static SQLBigQueryPerfTestOptions options;
private static String metricsBigQueryDataset;
private static String metricsBigQueryTable;
+ private static InfluxDBSettings settings;
private Pipeline pipeline = Pipeline.create(options);
private BeamSqlEnv sqlEnv;
@@ -97,6 +99,12 @@ public class BigQueryIOPushDownIT {
options = IOITHelper.readIOTestPipelineOptions(SQLBigQueryPerfTestOptions.class);
metricsBigQueryDataset = options.getMetricsBigQueryDataset();
metricsBigQueryTable = options.getMetricsBigQueryTable();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@Before
@@ -169,6 +177,7 @@ public class BigQueryIOPushDownIT {
IOITMetrics readMetrics =
new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp);
readMetrics.publish(metricsBigQueryDataset, metricsBigQueryTable + postfix);
+ readMetrics.publishToInflux(settings.copyWithMeasurement(settings.measurement + postfix));
}
private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index a12dd41..41cad8d 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -27,6 +27,7 @@ import com.google.cloud.bigquery.TableId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import org.apache.avro.generic.GenericData;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -92,6 +94,7 @@ public class BigQueryIOIT {
private static String tempRoot;
private static BigQueryPerfTestOptions options;
private static WriteFormat writeFormat;
+ private static InfluxDBSettings settings;
@BeforeClass
public static void setup() throws IOException {
@@ -108,6 +111,12 @@ public class BigQueryIOIT {
tableQualifier =
String.format(
"%s:%s.%s", bigQueryOptions.getProjectId(), testBigQueryDataset, testBigQueryTable);
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@AfterClass
@@ -193,14 +202,11 @@ public class BigQueryIOIT {
}
private void extractAndPublishTime(PipelineResult pipelineResult, String writeTimeMetricName) {
- NamedTestResult metricResult =
+ final NamedTestResult metricResult =
getMetricSupplier(writeTimeMetricName).apply(new MetricsReader(pipelineResult, NAMESPACE));
- IOITMetrics.publish(
- TEST_ID,
- TEST_TIMESTAMP,
- metricsBigQueryDataset,
- metricsBigQueryTable,
- Collections.singletonList(metricResult));
+ final List<NamedTestResult> listResults = Collections.singletonList(metricResult);
+ IOITMetrics.publish(metricsBigQueryDataset, metricsBigQueryTable, listResults);
+ IOITMetrics.publishToInflux(TEST_ID, TEST_TIMESTAMP, listResults, settings);
}
private static Function<MetricsReader, NamedTestResult> getMetricSupplier(String metricName) {
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 4735ade..058dbbd 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -43,4 +43,22 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
String getBigQueryTable();
void setBigQueryTable(@Nullable String tableName);
+
+ @Description("InfluxDB measurement to publish results to.")
+ @Nullable
+ String getInfluxMeasurement();
+
+ void setInfluxMeasurement(@Nullable String measurement);
+
+ @Description("InfluxDB host.")
+ @Nullable
+ String getInfluxHost();
+
+ void setInfluxHost(@Nullable String host);
+
+ @Description("InfluxDB database.")
+ @Nullable
+ String getInfluxDatabase();
+
+ void setInfluxDatabase(@Nullable String database);
}
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index 0b78688..0d48480 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -98,6 +99,7 @@ public class AvroIOIT {
private static Integer numberOfTextLines;
private static Integer datasetSize;
private static String expectedHash;
+ private static InfluxDBSettings settings;
@Rule public TestPipeline pipeline = TestPipeline.create();
@@ -111,6 +113,12 @@ public class AvroIOIT {
datasetSize = options.getDatasetSize();
expectedHash = options.getExpectedHash();
numberOfTextLines = options.getNumberOfRecords();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@Test
@@ -164,8 +172,10 @@ public class AvroIOIT {
Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
fillMetricSuppliers(uuid, timestamp);
- new IOITMetrics(metricSuppliers, result, AVRO_NAMESPACE, uuid, timestamp)
- .publish(bigQueryDataset, bigQueryTable);
+ final IOITMetrics metrics =
+ new IOITMetrics(metricSuppliers, result, AVRO_NAMESPACE, uuid, timestamp);
+ metrics.publish(bigQueryDataset, bigQueryTable);
+ metrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
index 90af13c..7a36ac8 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
@@ -97,6 +98,7 @@ public class ParquetIOIT {
private static Integer numberOfTextLines;
private static Integer datasetSize;
private static String expectedHash;
+ private static InfluxDBSettings settings;
@Rule public TestPipeline pipeline = TestPipeline.create();
private static final String PARQUET_NAMESPACE = ParquetIOIT.class.getName();
@@ -110,6 +112,12 @@ public class ParquetIOIT {
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
bigQueryDataset = options.getBigQueryDataset();
bigQueryTable = options.getBigQueryTable();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@Test
@@ -169,8 +177,10 @@ public class ParquetIOIT {
String timestamp = Timestamp.now().toString();
Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
fillMetricSuppliers(uuid, timestamp);
- new IOITMetrics(metricSuppliers, result, PARQUET_NAMESPACE, uuid, timestamp)
- .publish(bigQueryDataset, bigQueryTable);
+ final IOITMetrics metrics =
+ new IOITMetrics(metricSuppliers, result, PARQUET_NAMESPACE, uuid, timestamp);
+ metrics.publish(bigQueryDataset, bigQueryTable);
+ metrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 5625a28..c28bd93 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
@@ -88,6 +89,7 @@ public class TextIOIT {
private static String bigQueryDataset;
private static String bigQueryTable;
private static boolean gatherGcsPerformanceMetrics;
+ private static InfluxDBSettings settings;
private static final String FILEIOIT_NAMESPACE = TextIOIT.class.getName();
@Rule public TestPipeline pipeline = TestPipeline.create();
@@ -104,6 +106,12 @@ public class TextIOIT {
bigQueryDataset = options.getBigQueryDataset();
bigQueryTable = options.getBigQueryTable();
gatherGcsPerformanceMetrics = options.getReportGcsPerformanceMetrics();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@Test
@@ -161,8 +169,10 @@ public class TextIOIT {
Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
fillMetricSuppliers(uuid, timestamp.toString());
- new IOITMetrics(metricSuppliers, result, FILEIOIT_NAMESPACE, uuid, timestamp.toString())
- .publish(bigQueryDataset, bigQueryTable);
+ final IOITMetrics metrics =
+ new IOITMetrics(metricSuppliers, result, FILEIOIT_NAMESPACE, uuid, timestamp.toString());
+ metrics.publish(bigQueryDataset, bigQueryTable);
+ metrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 5473926..395000b 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
@@ -93,6 +94,7 @@ public class TFRecordIOIT {
private static Integer datasetSize;
private static String expectedHash;
private static Compression compressionType;
+ private static InfluxDBSettings settings;
@Rule public TestPipeline writePipeline = TestPipeline.create();
@@ -108,6 +110,12 @@ public class TFRecordIOIT {
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
bigQueryDataset = options.getBigQueryDataset();
bigQueryTable = options.getBigQueryTable();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
private static String createFilenamePattern() {
@@ -176,7 +184,8 @@ public class TFRecordIOIT {
MetricsReader.ofResults(readResults, TFRECORD_NAMESPACE)
.readAll(getReadMetricSuppliers(uuid, timestamp)));
- IOITMetrics.publish(uuid, timestamp, bigQueryDataset, bigQueryTable, results);
+ IOITMetrics.publish(bigQueryDataset, bigQueryTable, results);
+ IOITMetrics.publishToInflux(uuid, timestamp, results, settings);
}
private static Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index 5d8163e..c4884cc 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
@@ -100,6 +101,7 @@ public class XmlIOIT {
private static final String XMLIOIT_NAMESPACE = XmlIOIT.class.getName();
private static Charset charset;
+ private static InfluxDBSettings settings;
@Rule public TestPipeline pipeline = TestPipeline.create();
@@ -113,6 +115,12 @@ public class XmlIOIT {
datasetSize = options.getDatasetSize();
expectedHash = options.getExpectedHash();
numberOfTextLines = options.getNumberOfRecords();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@Test
@@ -176,8 +184,10 @@ public class XmlIOIT {
Set<Function<MetricsReader, NamedTestResult>> metricSuppliers =
fillMetricSuppliers(uuid, timestamp);
- new IOITMetrics(metricSuppliers, result, XMLIOIT_NAMESPACE, uuid, timestamp)
- .publish(bigQueryDataset, bigQueryTable);
+ final IOITMetrics metrics =
+ new IOITMetrics(metricSuppliers, result, XMLIOIT_NAMESPACE, uuid, timestamp);
+ metrics.publish(bigQueryDataset, bigQueryTable);
+ metrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> fillMetricSuppliers(
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
index 1a693ba..cbfbb28 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -99,6 +100,7 @@ public class HadoopFormatIOIT {
private static SerializableConfiguration hadoopConfiguration;
private static String bigQueryDataset;
private static String bigQueryTable;
+ private static InfluxDBSettings settings;
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Rule public TestPipeline readPipeline = TestPipeline.create();
@@ -114,6 +116,12 @@ public class HadoopFormatIOIT {
tableName = DatabaseTestHelper.getTestTableName("HadoopFormatIOIT");
bigQueryDataset = options.getBigQueryDataset();
bigQueryTable = options.getBigQueryTable();
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
executeWithRetry(HadoopFormatIOIT::createTable);
setupHadoopConfiguration(options);
@@ -214,7 +222,9 @@ public class HadoopFormatIOIT {
IOITMetrics writeMetrics =
new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
readMetrics.publish(bigQueryDataset, bigQueryTable);
+ readMetrics.publishToInflux(settings);
writeMetrics.publish(bigQueryDataset, bigQueryTable);
+ writeMetrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 81b9799..6c80002 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.ParDo;
@@ -85,6 +86,7 @@ public class JdbcIOIT {
private static String bigQueryDataset;
private static String bigQueryTable;
private static Long tableSize;
+ private static InfluxDBSettings settings;
@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();
@@ -100,6 +102,12 @@ public class JdbcIOIT {
tableName = DatabaseTestHelper.getTestTableName("IT");
executeWithRetry(JdbcIOIT::createTable);
tableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
private static void createTable() throws SQLException {
@@ -134,11 +142,13 @@ public class JdbcIOIT {
IOITMetrics writeMetrics =
new IOITMetrics(metricSuppliers, writeResult, NAMESPACE, uuid, timestamp);
writeMetrics.publish(bigQueryDataset, bigQueryTable);
+ writeMetrics.publishToInflux(settings);
IOITMetrics readMetrics =
new IOITMetrics(
getReadMetricSuppliers(uuid, timestamp), readResult, NAMESPACE, uuid, timestamp);
readMetrics.publish(bigQueryDataset, bigQueryTable);
+ readMetrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 012c93b..feb62c1 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
@@ -87,6 +88,8 @@ public class KafkaIOIT {
private static Options options;
+ private static InfluxDBSettings settings;
+
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Rule public TestPipeline readPipeline = TestPipeline.create();
@@ -101,6 +104,12 @@ public class KafkaIOIT {
1000L, "4507649971ee7c51abbb446e65a5c660",
100_000_000L, "0f12c27c9a7672e14775594be66cad9a");
expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, expectedHashes);
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@Test
@@ -130,8 +139,8 @@ public class KafkaIOIT {
cancelIfTimeouted(readResult, readState);
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
- IOITMetrics.publish(
- TEST_ID, TIMESTAMP, options.getBigQueryDataset(), options.getBigQueryTable(), metrics);
+ IOITMetrics.publish(options.getBigQueryDataset(), options.getBigQueryTable(), metrics);
+ IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
}
private Set<NamedTestResult> readMetrics(PipelineResult writeResult, PipelineResult readResult) {
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
index 121f538..086d9a6 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
@@ -84,6 +85,7 @@ public class MongoDBIOIT {
private static String bigQueryTable;
private static String mongoUrl;
private static MongoClient mongoClient;
+ private static InfluxDBSettings settings;
private double initialCollectionSize;
private double finalCollectionSize;
@@ -131,6 +133,12 @@ public class MongoDBIOIT {
mongoUrl =
String.format("mongodb://%s:%s", options.getMongoDBHostName(), options.getMongoDBPort());
mongoClient = MongoClients.create(mongoUrl);
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
}
@After
@@ -208,7 +216,9 @@ public class MongoDBIOIT {
IOITMetrics writeMetrics =
new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
readMetrics.publish(bigQueryDataset, bigQueryTable);
+ readMetrics.publishToInflux(settings);
writeMetrics.publish(bigQueryDataset, bigQueryTable);
+ writeMetrics.publishToInflux(settings);
}
private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index 40a172c..9dff968 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -39,6 +41,8 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
import org.apache.beam.sdk.testutils.publishing.ConsoleResultPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -49,6 +53,8 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base class for all load tests. Provides common operations such as initializing source/step
@@ -56,7 +62,9 @@ import org.joda.time.Duration;
*/
abstract class LoadTest<OptionsT extends LoadTestOptions> {
- private String metricsNamespace;
+ private static final Logger LOG = LoggerFactory.getLogger(LoadTest.class);
+
+ private final String metricsNamespace;
protected TimeMonitor<KV<byte[], byte[]>> runtimeMonitor;
@@ -66,13 +74,33 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
protected Pipeline pipeline;
+ private final String runner;
+
+ private final InfluxDBSettings settings;
+
LoadTest(String[] args, Class<OptionsT> testOptions, String metricsNamespace) throws IOException {
this.metricsNamespace = metricsNamespace;
this.runtimeMonitor = new TimeMonitor<>(metricsNamespace, "runtime");
this.options = LoadTestOptions.readFromArgs(args, testOptions);
this.sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
-
this.pipeline = Pipeline.create(options);
+ this.runner = getRunnerName(options.getRunner().getName());
+ settings =
+ InfluxDBSettings.builder()
+ .withHost(options.getInfluxHost())
+ .withDatabase(options.getInfluxDatabase())
+ .withMeasurement(options.getInfluxMeasurement())
+ .get();
+ }
+
+ private static String getRunnerName(final String runnerName) {
+ final Matcher matcher = Pattern.compile("((.*)\\.)?(.*?)Runner").matcher(runnerName);
+ if (matcher.matches()) {
+ return matcher.group(3).toLowerCase() + "_";
+ } else {
+ LOG.warn("Unable to get runner name, no prefix used for metrics");
+ return "";
+ }
}
PTransform<PBegin, PCollection<KV<byte[], byte[]>>> readFromSource(
@@ -91,15 +119,15 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
* Runs the load test, collects and publishes test results to various data store and/or console.
*/
public PipelineResult run() throws IOException {
- Timestamp timestamp = Timestamp.now();
+ final Timestamp timestamp = Timestamp.now();
loadTest();
- PipelineResult pipelineResult = pipeline.run();
+ final PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish(Duration.standardMinutes(options.getLoadTestTimeout()));
- String testId = UUID.randomUUID().toString();
- List metrics = readMetrics(timestamp, pipelineResult, testId);
+ final String testId = UUID.randomUUID().toString();
+ final List<NamedTestResult> metrics = readMetrics(timestamp, pipelineResult, testId);
ConsoleResultPublisher.publish(metrics, testId, timestamp.toString());
@@ -109,6 +137,10 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
publishResultsToBigQuery(metrics);
}
+ if (options.getPublishToInfluxDB()) {
+ InfluxDBPublisher.publishWithSettings(metrics, settings);
+ }
+
return pipelineResult;
}
@@ -120,14 +152,14 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
NamedTestResult.create(
testId,
timestamp.toString(),
- "runtime_sec",
+ runner + "runtime_sec",
(reader.getEndTimeMetric("runtime") - reader.getStartTimeMetric("runtime")) / 1000D);
NamedTestResult totalBytes =
NamedTestResult.create(
testId,
timestamp.toString(),
- "total_bytes_count",
+ runner + "total_bytes_count",
reader.getCounterMetric("totalBytes.count"));
return Arrays.asList(runtime, totalBytes);
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
index 3353e43..71da5f4 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
@@ -68,6 +68,30 @@ public interface LoadTestOptions extends PipelineOptions, ApplicationNameOptions
void setInputWindowDurationSec(Long windowSizeSec);
+ @Description("InfluxDB measurement to publish results to.")
+ @Nullable
+ String getInfluxMeasurement();
+
+ void setInfluxMeasurement(@Nullable String measurement);
+
+ @Description("InfluxDB host.")
+ @Nullable
+ String getInfluxHost();
+
+ void setInfluxHost(@Nullable String host);
+
+ @Description("InfluxDB database.")
+ @Nullable
+ String getInfluxDatabase();
+
+ void setInfluxDatabase(@Nullable String database);
+
+ @Description("Whether the results should be published to InfluxDB")
+ @Default.Boolean(false)
+ Boolean getPublishToInfluxDB();
+
+ void setPublishToInfluxDB(Boolean publishToInfluxDB);
+
static <T extends LoadTestOptions> T readFromArgs(String[] args, Class<T> optionsClass) {
return PipelineOptionsFactory.fromArgs(args).withValidation().as(optionsClass);
}
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java
index f7b1559..05482c3 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/IOITMetrics.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
import org.apache.beam.sdk.testutils.publishing.ConsoleResultPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
/**
* Contains a flexible mechanism of publishing metrics to BQ and console using suppliers provided in
@@ -54,20 +56,34 @@ public class IOITMetrics {
MetricsReader reader = new MetricsReader(result, namespace);
Collection<NamedTestResult> namedTestResults = reader.readAll(metricSuppliers);
- publish(uuid, timestamp, bigQueryDataset, bigQueryTable, namedTestResults);
+ publish(bigQueryDataset, bigQueryTable, namedTestResults);
}
public static void publish(
- String uuid,
- String timestamp,
- String bigQueryDataset,
- String bigQueryTable,
- Collection<NamedTestResult> results) {
+ final String bigQueryDataset,
+ final String bigQueryTable,
+ final Collection<NamedTestResult> results) {
if (bigQueryDataset != null && bigQueryTable != null) {
BigQueryResultsPublisher.create(bigQueryDataset, NamedTestResult.getSchema())
.publish(results, bigQueryTable);
}
+ }
+
+ public void publishToInflux(final InfluxDBSettings settings) {
+ MetricsReader reader = new MetricsReader(result, namespace);
+ Collection<NamedTestResult> namedTestResults = reader.readAll(metricSuppliers);
+
+ publishToInflux(uuid, timestamp, namedTestResults, settings);
+ }
+
+ public static void publishToInflux(
+ final String uuid,
+ final String timestamp,
+ final Collection<NamedTestResult> results,
+ final InfluxDBSettings settings) {
+
ConsoleResultPublisher.publish(results, uuid, timestamp);
+ InfluxDBPublisher.publishWithSettings(results, settings);
}
}
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
new file mode 100644
index 0000000..1fe8ada
--- /dev/null
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils.isNoneBlank;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class InfluxDBPublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(InfluxDBPublisher.class);
+
+ private InfluxDBPublisher() {}
+
+ public static void publishWithSettings(
+ final Collection<NamedTestResult> results, final InfluxDBSettings settings) {
+ requireNonNull(settings, "InfluxDB settings must not be null");
+ if (isNoneBlank(settings.measurement, settings.database)) {
+ try {
+ publish(results, settings);
+ } catch (final Exception exception) {
+ LOG.warn("Unable to publish metrics due to error: {}", exception.getMessage(), exception);
+ }
+ } else {
+ LOG.warn("Missing property -- measurement/database. Metrics won't be published.");
+ }
+ }
+
+ private static void publish(
+ final Collection<NamedTestResult> results, final InfluxDBSettings settings) throws Exception {
+
+ final HttpClientBuilder builder = HttpClientBuilder.create();
+
+ if (isNoneBlank(settings.userName, settings.userPassword)) {
+ final CredentialsProvider provider = new BasicCredentialsProvider();
+ provider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(settings.userName, settings.userPassword));
+ builder.setDefaultCredentialsProvider(provider);
+ }
+
+ final HttpPost postRequest = new HttpPost(settings.host + "write?db=" + settings.database);
+
+ final StringBuilder metricBuilder = new StringBuilder();
+ results.stream()
+ .map(NamedTestResult::toMap)
+ .forEach(
+ map ->
+ metricBuilder
+ .append(settings.measurement)
+ .append(",")
+ .append("test_id")
+ .append("=")
+ .append(map.get("test_id"))
+ .append(",")
+ .append("metric")
+ .append("=")
+ .append(map.get("metric"))
+ .append(" ")
+ .append("value")
+ .append("=")
+ .append(map.get("value"))
+ .append('\n'));
+
+ postRequest.setEntity(new ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8)));
+ try (final CloseableHttpResponse response = builder.build().execute(postRequest)) {
+ is2xx(response.getStatusLine().getStatusCode());
+ }
+ }
+
+ private static void is2xx(final int code) throws IOException {
+ if (code < 200 || code >= 300) {
+ throw new IOException("Response code: " + code);
+ }
+ }
+}
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBSettings.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBSettings.java
new file mode 100644
index 0000000..927f542
--- /dev/null
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBSettings.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils.isBlank;
+
+public final class InfluxDBSettings {
+
+ public final String host;
+ public final String userName;
+ public final String userPassword;
+ public final String measurement;
+ public final String database;
+
+ private InfluxDBSettings(
+ String host, String userName, String userPassword, String measurement, String database) {
+ this.host = host;
+ this.userName = userName;
+ this.userPassword = userPassword;
+ this.measurement = measurement;
+ this.database = database;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public InfluxDBSettings copyWithMeasurement(final String newMeasurement) {
+ return new InfluxDBSettings(host, userName, userPassword, newMeasurement, database);
+ }
+
+ public static class Builder {
+ private static final String DEFAULT_HOST = "http://localhost:8086/";
+ private static final String INFLUX_USER = "INFLUXDB_USER";
+ private static final String INFLUX_PASSWORD = "INFLUXDB_USER_PASSWORD";
+
+ private String host;
+ private String measurement;
+ private String database;
+
+ public Builder withHost(final String host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder withMeasurement(final String measurement) {
+ this.measurement = measurement;
+ return this;
+ }
+
+ public Builder withDatabase(final String database) {
+ this.database = database;
+ return this;
+ }
+
+ public InfluxDBSettings get() {
+ final String userName = System.getenv(INFLUX_USER);
+ final String userPassword = System.getenv(INFLUX_PASSWORD);
+ final String influxHost = isBlank(host) ? DEFAULT_HOST : host;
+
+ return new InfluxDBSettings(influxHost, userName, userPassword, measurement, database);
+ }
+ }
+}