You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/08/01 19:44:47 UTC
[beam] branch master updated: Merge PR #22304 fixing #22331 fixing JDBC IO IT
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 324e86db849 Merge PR #22304 fixing #22331 fixing JDBC IO IT
324e86db849 is described below
commit 324e86db8499b6d52e7d3c6c2c6672749c842389
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon Aug 1 15:44:40 2022 -0400
Merge PR #22304 fixing #22331 fixing JDBC IO IT
* Fix JdbcIOIT, which seems to have never worked
* Fix bugs in JDBCIOIT
* Use pooled connection for tests to avoid freeze
* Use stream pcoll for testWriteWithAutosharding
* Move testWriteThenRead() specific class member into local
Co-authored-by: Pablo E <pa...@apache.org>
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 193 ++++++++++++++-------
1 file changed, 134 insertions(+), 59 deletions(-)
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 97efd4bd6cb..e08f7be5c4f 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.io.jdbc;
import static org.apache.beam.sdk.io.common.DatabaseTestHelper.assertRowCount;
import static org.apache.beam.sdk.io.common.DatabaseTestHelper.getTestDataToWrite;
-import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import com.google.cloud.Timestamp;
import java.sql.SQLException;
@@ -32,17 +32,17 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
@@ -51,12 +51,19 @@ import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicSequence;
+import org.apache.beam.sdk.transforms.PeriodicSequence.SequenceDefinition;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -88,18 +95,19 @@ import org.postgresql.ds.PGSimpleDataSource;
@RunWith(JUnit4.class)
public class JdbcIOIT {
+ // the number of rows written to table in normal integration tests (not the performance test).
private static final int EXPECTED_ROW_COUNT = 1000;
private static final String NAMESPACE = JdbcIOIT.class.getName();
+ // the number of rows written to table in the performance test.
private static int numberOfRows;
private static PGSimpleDataSource dataSource;
private static String tableName;
- private static Long tableSize;
private static InfluxDBSettings settings;
@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();
@BeforeClass
- public static void setup() throws Exception {
+ public static void setup() {
PostgresIOTestPipelineOptions options;
try {
options = readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
@@ -107,12 +115,9 @@ public class JdbcIOIT {
options = null;
}
org.junit.Assume.assumeNotNull(options);
-
numberOfRows = options.getNumberOfRecords();
dataSource = DatabaseTestHelper.getPostgresDataSource(options);
tableName = DatabaseTestHelper.getTestTableName("IT");
- executeWithRetry(JdbcIOIT::createTable);
- tableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
settings =
InfluxDBSettings.builder()
.withHost(options.getInfluxHost())
@@ -121,27 +126,22 @@ public class JdbcIOIT {
.get();
}
- private static void createTable() throws SQLException {
- DatabaseTestHelper.createTable(dataSource, tableName);
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- executeWithRetry(JdbcIOIT::deleteTable);
- }
-
- private static void deleteTable() throws SQLException {
- DatabaseTestHelper.deleteTable(dataSource, tableName);
- }
-
- /** Tests writing then reading data for a postgres database. */
+ /**
+ * Tests writing then reading data for a postgres database. Also used as a performance test of
+ * JDBCIO.
+ */
@Test
- public void testWriteThenRead() {
- PipelineResult writeResult = runWrite();
- writeResult.waitUntilFinish();
- PipelineResult readResult = runRead();
- readResult.waitUntilFinish();
- gatherAndPublishMetrics(writeResult, readResult);
+ public void testWriteThenRead() throws SQLException {
+ DatabaseTestHelper.createTable(dataSource, tableName);
+ try {
+ PipelineResult writeResult = runWrite();
+ writeResult.waitUntilFinish();
+ PipelineResult readResult = runRead();
+ readResult.waitUntilFinish();
+ gatherAndPublishMetrics(writeResult, readResult);
+ } finally {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
}
private void gatherAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
@@ -177,9 +177,7 @@ public class JdbcIOIT {
postgresTableSize.ifPresent(
tableFinalSize ->
suppliers.add(
- ignore ->
- NamedTestResult.create(
- uuid, timestamp, "total_size", tableFinalSize - tableSize)));
+ ignore -> NamedTestResult.create(uuid, timestamp, "total_size", tableFinalSize)));
return suppliers;
}
@@ -264,43 +262,120 @@ public class JdbcIOIT {
return pipelineRead.run();
}
+ /** An integration test of auto sharding functionality using test stream. */
@Test
public void testWriteWithAutosharding() throws Exception {
- String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ String firstTableName = DatabaseTestHelper.getTestTableName("JDBCIT_AUTOSHARD");
DatabaseTestHelper.createTable(dataSource, firstTableName);
try {
- List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
- TestStream.Builder<KV<Integer, String>> ts =
- TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
- .advanceWatermarkTo(Instant.now());
- for (KV<Integer, String> elm : data) {
- ts.addElements(elm);
+ PCollection<TestRow> dataCollection =
+ pipelineWrite.apply(
+ // emit 50_000 elements per seconds.
+ new GenerateRecordsStream(numberOfRows, 50_000, Duration.standardSeconds(1)));
+ dataCollection
+ .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "write_time")))
+ .apply(
+ JdbcIO.<TestRow>write()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withStatement(String.format("insert into %s values(?, ?)", firstTableName))
+ .withAutoSharding()
+ .withPreparedStatementSetter(new JdbcTestHelper.PrepareStatementFromTestRow()));
+
+ List<String> additionalArgs = Lists.newArrayList("--streaming");
+ if (pipelineWrite
+ .getOptions()
+ .getRunner()
+ .getCanonicalName()
+ .startsWith("org.apache.beam.runners.dataflow")) {
+ // enableStreamingEngine is a gcp option.
+ additionalArgs.add("--enableStreamingEngine");
}
+ pipelineWrite.runWithAdditionalOptionArgs(additionalArgs).waitUntilFinish();
- PCollection<KV<Integer, String>> dataCollection =
- pipelineWrite.apply(ts.advanceWatermarkToInfinity());
- dataCollection.apply(
- JdbcIO.<KV<Integer, String>>write()
- .withDataSourceProviderFn(voidInput -> dataSource)
- .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
- .withAutoSharding()
- .withPreparedStatementSetter(
- (element, statement) -> {
- statement.setInt(1, element.getKey());
- statement.setString(2, element.getValue());
- }));
-
- pipelineWrite.run().waitUntilFinish();
-
- runRead();
+ assertRowCount(dataSource, firstTableName, numberOfRows);
} finally {
DatabaseTestHelper.deleteTable(dataSource, firstTableName);
}
}
+ /** Generate a stream of records for testing. */
+ private static class GenerateRecordsStream extends PTransform<PBegin, PCollection<TestRow>> {
+ private final long numRecords;
+ private final long numPerPeriod;
+
+ public GenerateRecordsStream(long numRecords, long numPerPeriod, Duration periodLength) {
+ this.numRecords = numRecords;
+ this.numPerPeriod = numPerPeriod;
+ }
+
+ @Override
+ public PCollection<TestRow> expand(PBegin pBegin) {
+ PCollection<TestRow> pcoll =
+ pBegin
+ .apply(Impulse.create())
+ .apply(ParDo.of(new GenerateSequenceDefinitionFn(numRecords / numPerPeriod)))
+ .apply(PeriodicSequence.create())
+ .apply(
+ "Add dumb key",
+ ParDo.of(
+ new DoFn<Instant, KV<Integer, Instant>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(0, c.element()));
+ }
+ }))
+ .apply(ParDo.of(new EmitSequenceFn(numRecords, numPerPeriod)))
+ .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()));
+ return pcoll;
+ }
+ }
+
+ /** Set Periodic Sequence starting time when pipeline executation begins. */
+ private static class GenerateSequenceDefinitionFn extends DoFn<byte[], SequenceDefinition> {
+ private final long numPulses;
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Instant now = Instant.now();
+ c.output(
+ new SequenceDefinition(
+ now, now.plus(Duration.standardSeconds(numPulses)), Duration.standardSeconds(1)));
+ }
+
+ public GenerateSequenceDefinitionFn(long numPulses) {
+ this.numPulses = numPulses;
+ }
+ }
+
+ private static class EmitSequenceFn extends DoFn<KV<Integer, Instant>, Long> {
+ private final long numRecords;
+ private final long numPerPeriod;
+
+ public EmitSequenceFn(long numRecords, long numPerPeriod) {
+ this.numRecords = numRecords;
+ this.numPerPeriod = numPerPeriod;
+ }
+
+ @StateId("count")
+ @SuppressWarnings("unused")
+ private final StateSpec<ValueState<Integer>> countSpec = StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void processElement(ProcessContext c, @StateId("count") ValueState<Integer> count) {
+ int current = firstNonNull(count.read(), 0);
+ count.write(current + 1);
+ long startId = current * numPerPeriod;
+ long endId = Math.min((current + 1) * numPerPeriod, numRecords);
+ for (long id = startId; id < endId; ++id) {
+ c.output(id);
+ }
+ }
+ }
+
+ /** An integration test of with write results functionality. */
@Test
public void testWriteWithWriteResults() throws Exception {
- String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ String firstTableName = DatabaseTestHelper.getTestTableName("JDBCIT_WRITE");
DatabaseTestHelper.createTable(dataSource, firstTableName);
try {
ArrayList<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
@@ -325,7 +400,7 @@ public class JdbcIOIT {
PAssert.that(resultSetCollection).containsInAnyOrder(expectedResult);
- pipelineWrite.run();
+ pipelineWrite.run().waitUntilFinish();
assertRowCount(dataSource, firstTableName, EXPECTED_ROW_COUNT);
} finally {
@@ -339,7 +414,7 @@ public class JdbcIOIT {
*/
private static JdbcIO.Write<KV<Integer, String>> getJdbcWriteWithReturning(String tableName) {
return JdbcIO.<KV<Integer, String>>write()
- .withDataSourceProviderFn(voidInput -> dataSource)
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
.withPreparedStatementSetter(
(element, statement) -> {