You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/08/01 19:44:47 UTC

[beam] branch master updated: Merge PR #22304 fixing #22331 fixing JDBC IO IT

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 324e86db849 Merge PR #22304 fixing #22331 fixing JDBC IO IT
324e86db849 is described below

commit 324e86db8499b6d52e7d3c6c2c6672749c842389
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon Aug 1 15:44:40 2022 -0400

    Merge PR #22304 fixing #22331 fixing JDBC IO IT
    
    * Fix JdbcIOIT, which seems to have never worked
    
    * Fix bugs in JDBCIOIT
    
    * Use pooled connection for tests to avoid freeze
    
    * Use stream pcoll for testWriteWithAutosharding
    
    * Move testWriteThenRead() specific class member into local
    
    Co-authored-by: Pablo E <pa...@apache.org>
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 193 ++++++++++++++-------
 1 file changed, 134 insertions(+), 59 deletions(-)

diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 97efd4bd6cb..e08f7be5c4f 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.io.jdbc;
 
 import static org.apache.beam.sdk.io.common.DatabaseTestHelper.assertRowCount;
 import static org.apache.beam.sdk.io.common.DatabaseTestHelper.getTestDataToWrite;
-import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
 import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
 
 import com.google.cloud.Timestamp;
 import java.sql.SQLException;
@@ -32,17 +32,17 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
 import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testutils.NamedTestResult;
 import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
 import org.apache.beam.sdk.testutils.metrics.MetricsReader;
@@ -51,12 +51,19 @@ import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicSequence;
+import org.apache.beam.sdk.transforms.PeriodicSequence.SequenceDefinition;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -88,18 +95,19 @@ import org.postgresql.ds.PGSimpleDataSource;
 @RunWith(JUnit4.class)
 public class JdbcIOIT {
 
+  // the number of rows written to table in normal integration tests (not the performance test).
   private static final int EXPECTED_ROW_COUNT = 1000;
   private static final String NAMESPACE = JdbcIOIT.class.getName();
+  // the number of rows written to table in the performance test.
   private static int numberOfRows;
   private static PGSimpleDataSource dataSource;
   private static String tableName;
-  private static Long tableSize;
   private static InfluxDBSettings settings;
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
   @BeforeClass
-  public static void setup() throws Exception {
+  public static void setup() {
     PostgresIOTestPipelineOptions options;
     try {
       options = readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
@@ -107,12 +115,9 @@ public class JdbcIOIT {
       options = null;
     }
     org.junit.Assume.assumeNotNull(options);
-
     numberOfRows = options.getNumberOfRecords();
     dataSource = DatabaseTestHelper.getPostgresDataSource(options);
     tableName = DatabaseTestHelper.getTestTableName("IT");
-    executeWithRetry(JdbcIOIT::createTable);
-    tableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
     settings =
         InfluxDBSettings.builder()
             .withHost(options.getInfluxHost())
@@ -121,27 +126,22 @@ public class JdbcIOIT {
             .get();
   }
 
-  private static void createTable() throws SQLException {
-    DatabaseTestHelper.createTable(dataSource, tableName);
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    executeWithRetry(JdbcIOIT::deleteTable);
-  }
-
-  private static void deleteTable() throws SQLException {
-    DatabaseTestHelper.deleteTable(dataSource, tableName);
-  }
-
-  /** Tests writing then reading data for a postgres database. */
+  /**
+   * Tests writing then reading data for a postgres database. Also used as a performance test of
+   * JDBCIO.
+   */
   @Test
-  public void testWriteThenRead() {
-    PipelineResult writeResult = runWrite();
-    writeResult.waitUntilFinish();
-    PipelineResult readResult = runRead();
-    readResult.waitUntilFinish();
-    gatherAndPublishMetrics(writeResult, readResult);
+  public void testWriteThenRead() throws SQLException {
+    DatabaseTestHelper.createTable(dataSource, tableName);
+    try {
+      PipelineResult writeResult = runWrite();
+      writeResult.waitUntilFinish();
+      PipelineResult readResult = runRead();
+      readResult.waitUntilFinish();
+      gatherAndPublishMetrics(writeResult, readResult);
+    } finally {
+      DatabaseTestHelper.deleteTable(dataSource, tableName);
+    }
   }
 
   private void gatherAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
@@ -177,9 +177,7 @@ public class JdbcIOIT {
     postgresTableSize.ifPresent(
         tableFinalSize ->
             suppliers.add(
-                ignore ->
-                    NamedTestResult.create(
-                        uuid, timestamp, "total_size", tableFinalSize - tableSize)));
+                ignore -> NamedTestResult.create(uuid, timestamp, "total_size", tableFinalSize)));
     return suppliers;
   }
 
@@ -264,43 +262,120 @@ public class JdbcIOIT {
     return pipelineRead.run();
   }
 
+  /** An integration test of auto sharding functionality using test stream. */
   @Test
   public void testWriteWithAutosharding() throws Exception {
-    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    String firstTableName = DatabaseTestHelper.getTestTableName("JDBCIT_AUTOSHARD");
     DatabaseTestHelper.createTable(dataSource, firstTableName);
     try {
-      List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
-      TestStream.Builder<KV<Integer, String>> ts =
-          TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
-              .advanceWatermarkTo(Instant.now());
-      for (KV<Integer, String> elm : data) {
-        ts.addElements(elm);
+      PCollection<TestRow> dataCollection =
+          pipelineWrite.apply(
+              // emit 50_000 elements per seconds.
+              new GenerateRecordsStream(numberOfRows, 50_000, Duration.standardSeconds(1)));
+      dataCollection
+          .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "write_time")))
+          .apply(
+              JdbcIO.<TestRow>write()
+                  .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+                  .withStatement(String.format("insert into %s values(?, ?)", firstTableName))
+                  .withAutoSharding()
+                  .withPreparedStatementSetter(new JdbcTestHelper.PrepareStatementFromTestRow()));
+
+      List<String> additionalArgs = Lists.newArrayList("--streaming");
+      if (pipelineWrite
+          .getOptions()
+          .getRunner()
+          .getCanonicalName()
+          .startsWith("org.apache.beam.runners.dataflow")) {
+        // enableStreamingEngine is a gcp option.
+        additionalArgs.add("--enableStreamingEngine");
       }
+      pipelineWrite.runWithAdditionalOptionArgs(additionalArgs).waitUntilFinish();
 
-      PCollection<KV<Integer, String>> dataCollection =
-          pipelineWrite.apply(ts.advanceWatermarkToInfinity());
-      dataCollection.apply(
-          JdbcIO.<KV<Integer, String>>write()
-              .withDataSourceProviderFn(voidInput -> dataSource)
-              .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
-              .withAutoSharding()
-              .withPreparedStatementSetter(
-                  (element, statement) -> {
-                    statement.setInt(1, element.getKey());
-                    statement.setString(2, element.getValue());
-                  }));
-
-      pipelineWrite.run().waitUntilFinish();
-
-      runRead();
+      assertRowCount(dataSource, firstTableName, numberOfRows);
     } finally {
       DatabaseTestHelper.deleteTable(dataSource, firstTableName);
     }
   }
 
+  /** Generate a stream of records for testing. */
+  private static class GenerateRecordsStream extends PTransform<PBegin, PCollection<TestRow>> {
+    private final long numRecords;
+    private final long numPerPeriod;
+
+    public GenerateRecordsStream(long numRecords, long numPerPeriod, Duration periodLength) {
+      this.numRecords = numRecords;
+      this.numPerPeriod = numPerPeriod;
+    }
+
+    @Override
+    public PCollection<TestRow> expand(PBegin pBegin) {
+      PCollection<TestRow> pcoll =
+          pBegin
+              .apply(Impulse.create())
+              .apply(ParDo.of(new GenerateSequenceDefinitionFn(numRecords / numPerPeriod)))
+              .apply(PeriodicSequence.create())
+              .apply(
+                  "Add dumb key",
+                  ParDo.of(
+                      new DoFn<Instant, KV<Integer, Instant>>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) {
+                          c.output(KV.of(0, c.element()));
+                        }
+                      }))
+              .apply(ParDo.of(new EmitSequenceFn(numRecords, numPerPeriod)))
+              .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()));
+      return pcoll;
+    }
+  }
+
+  /** Set Periodic Sequence starting time when pipeline executation begins. */
+  private static class GenerateSequenceDefinitionFn extends DoFn<byte[], SequenceDefinition> {
+    private final long numPulses;
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Instant now = Instant.now();
+      c.output(
+          new SequenceDefinition(
+              now, now.plus(Duration.standardSeconds(numPulses)), Duration.standardSeconds(1)));
+    }
+
+    public GenerateSequenceDefinitionFn(long numPulses) {
+      this.numPulses = numPulses;
+    }
+  }
+
+  private static class EmitSequenceFn extends DoFn<KV<Integer, Instant>, Long> {
+    private final long numRecords;
+    private final long numPerPeriod;
+
+    public EmitSequenceFn(long numRecords, long numPerPeriod) {
+      this.numRecords = numRecords;
+      this.numPerPeriod = numPerPeriod;
+    }
+
+    @StateId("count")
+    @SuppressWarnings("unused")
+    private final StateSpec<ValueState<Integer>> countSpec = StateSpecs.value(VarIntCoder.of());
+
+    @ProcessElement
+    public void processElement(ProcessContext c, @StateId("count") ValueState<Integer> count) {
+      int current = firstNonNull(count.read(), 0);
+      count.write(current + 1);
+      long startId = current * numPerPeriod;
+      long endId = Math.min((current + 1) * numPerPeriod, numRecords);
+      for (long id = startId; id < endId; ++id) {
+        c.output(id);
+      }
+    }
+  }
+
+  /** An integration test of with write results functionality. */
   @Test
   public void testWriteWithWriteResults() throws Exception {
-    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    String firstTableName = DatabaseTestHelper.getTestTableName("JDBCIT_WRITE");
     DatabaseTestHelper.createTable(dataSource, firstTableName);
     try {
       ArrayList<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
@@ -325,7 +400,7 @@ public class JdbcIOIT {
 
       PAssert.that(resultSetCollection).containsInAnyOrder(expectedResult);
 
-      pipelineWrite.run();
+      pipelineWrite.run().waitUntilFinish();
 
       assertRowCount(dataSource, firstTableName, EXPECTED_ROW_COUNT);
     } finally {
@@ -339,7 +414,7 @@ public class JdbcIOIT {
    */
   private static JdbcIO.Write<KV<Integer, String>> getJdbcWriteWithReturning(String tableName) {
     return JdbcIO.<KV<Integer, String>>write()
-        .withDataSourceProviderFn(voidInput -> dataSource)
+        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
         .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
         .withPreparedStatementSetter(
             (element, statement) -> {