You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ah...@apache.org on 2023/12/27 09:16:59 UTC

(beam) branch master updated: Allow large timestamp skew for at-least-once streaming (#29858)

This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 989a2198a31 Allow large timestamp skew for at-least-once streaming (#29858)
989a2198a31 is described below

commit 989a2198a3174a66cd733834383f3603de70d1fa
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Dec 27 12:16:53 2023 +0300

    Allow large timestamp skew for at-least-once streaming (#29858)
    
    * large skew
    
    * test
    
    * use AppendSerializationError everywhere
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  11 ++-
 .../bigquery/StorageApiWritesShardedRecords.java   |   6 +-
 .../sdk/io/gcp/testing/FakeDatasetService.java     |   2 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 109 +++++++++++++++++++++
 .../io/gcp/bigquery/BigQuerySinkMetricsTest.java   |   4 +-
 5 files changed, 123 insertions(+), 9 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8f24ebc8ad9..3c6c73dd021 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -670,10 +670,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                   BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
 
               if (failedContext.getError() != null
-                  && failedContext.getError() instanceof Exceptions.AppendSerializtionError) {
-                Exceptions.AppendSerializtionError error =
+                  && failedContext.getError() instanceof Exceptions.AppendSerializationError) {
+                Exceptions.AppendSerializationError error =
                     Preconditions.checkStateNotNull(
-                        (Exceptions.AppendSerializtionError) failedContext.getError());
+                        (Exceptions.AppendSerializationError) failedContext.getError());
 
                 Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
                 for (int failedIndex : failedRowIndices) {
@@ -1164,5 +1164,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         throw new RuntimeException(e);
       }
     }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    }
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 8cf8ad0ee02..0f9b07d0c40 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -662,10 +662,10 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
             // failedInserts
             // PCollection, and retry with the remaining rows.
             if (failedContext.getError() != null
-                && failedContext.getError() instanceof Exceptions.AppendSerializtionError) {
-              Exceptions.AppendSerializtionError error =
+                && failedContext.getError() instanceof Exceptions.AppendSerializationError) {
+              Exceptions.AppendSerializationError error =
                   Preconditions.checkArgumentNotNull(
-                      (Exceptions.AppendSerializtionError) failedContext.getError());
+                      (Exceptions.AppendSerializationError) failedContext.getError());
 
               Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
               for (int failedIndex : failedRowIndices) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 6a50127acd8..1e746d7f96b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -680,7 +680,7 @@ public class FakeDatasetService implements DatasetService, WriteStreamService, S
           }
           if (!rowIndexToErrorMessage.isEmpty()) {
             return ApiFutures.immediateFailedFuture(
-                new Exceptions.AppendSerializtionError(
+                new Exceptions.AppendSerializationError(
                     Code.INVALID_ARGUMENT.getNumber(),
                     "Append serialization failed for writer: " + streamName,
                     stream.streamName,
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 720419f2227..55269342155 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -35,6 +35,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
 import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
@@ -48,7 +50,12 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.Exceptions;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -124,14 +131,18 @@ import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -3104,6 +3115,104 @@ public class BigQueryIOWriteTest implements Serializable {
         containsInAnyOrder(Iterables.toArray(rows, TableRow.class)));
   }
 
+  public static class ThrowingFakeDatasetServices extends FakeDatasetService {
+    @Override
+    public BigQueryServices.StreamAppendClient getStreamAppendClient(
+        String streamName,
+        DescriptorProtos.DescriptorProto descriptor,
+        boolean useConnectionPool,
+        AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) {
+      return new BigQueryServices.StreamAppendClient() {
+        @Override
+        public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) {
+          Map<Integer, String> errorMap = new HashMap<>();
+          for (int i = 0; i < rows.getSerializedRowsCount(); i++) {
+            errorMap.put(i, "some serialization error");
+          }
+          SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create();
+          appendResult.setException(
+              new Exceptions.AppendSerializationError(
+                  404, "some description", "some stream", errorMap));
+          return appendResult;
+        }
+
+        @Override
+        public com.google.cloud.bigquery.storage.v1.@Nullable TableSchema getUpdatedSchema() {
+          return null;
+        }
+
+        @Override
+        public void pin() {}
+
+        @Override
+        public void unpin() {}
+
+        @Override
+        public void close() {}
+      };
+    }
+  }
+
+  @Test
+  public void testStorageWriteReturnsAppendSerializationError() throws Exception {
+    assumeTrue(useStorageApi);
+    assumeTrue(useStreaming);
+    p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdRecordCount(5);
+
+    TableSchema schema =
+        new TableSchema()
+            .setFields(Arrays.asList(new TableFieldSchema().setType("INTEGER").setName("long")));
+    Table fakeTable = new Table();
+    TableReference ref =
+        new TableReference()
+            .setProjectId("project-id")
+            .setDatasetId("dataset-id")
+            .setTableId("table-id");
+    fakeTable.setSchema(schema);
+    fakeTable.setTableReference(ref);
+
+    ThrowingFakeDatasetServices throwingService = new ThrowingFakeDatasetServices();
+    throwingService.createTable(fakeTable);
+
+    int numRows = 100;
+
+    WriteResult res =
+        p.apply(
+                PeriodicImpulse.create()
+                    .startAt(Instant.ofEpochMilli(0))
+                    .stopAfter(Duration.millis(numRows - 1))
+                    .withInterval(Duration.millis(1)))
+            .apply(
+                "Convert to longs",
+                MapElements.into(TypeDescriptor.of(TableRow.class))
+                    .via(instant -> new TableRow().set("long", instant.getMillis())))
+            .apply(
+                BigQueryIO.writeTableRows()
+                    .to(ref)
+                    .withSchema(schema)
+                    .withTestServices(
+                        new FakeBigQueryServices()
+                            .withDatasetService(throwingService)
+                            .withJobService(fakeJobService)));
+
+    PCollection<Integer> numErrors =
+        res.getFailedStorageApiInserts()
+            .apply(
+                "Count errors",
+                MapElements.into(TypeDescriptors.integers())
+                    .via(err -> err.getErrorMessage().equals("some serialization error") ? 1 : 0))
+            .apply(
+                Window.<Integer>into(new GlobalWindows())
+                    .triggering(AfterWatermark.pastEndOfWindow())
+                    .discardingFiredPanes()
+                    .withAllowedLateness(Duration.ZERO))
+            .apply(Sum.integersGlobally());
+
+    PAssert.that(numErrors).containsInAnyOrder(numRows);
+
+    p.run().waitUntilFinish();
+  }
+
   @Test
   public void testWriteProtos() throws Exception {
     BigQueryIO.Write.Method method =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index d754927bdb3..9c6fae164fc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -144,7 +144,7 @@ public class BigQuerySinkMetricsTest {
 
     int notFoundVal = Status.Code.NOT_FOUND.value();
     Throwable grpcError =
-        new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null);
+        new Exceptions.AppendSerializationError(notFoundVal, "Test Error", "Stream name", null);
     assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(grpcError), equalTo("NOT_FOUND"));
   }
 
@@ -220,7 +220,7 @@ public class BigQuerySinkMetricsTest {
     c.setOperationEndTime(t1.plusMillis(5));
     int notFoundVal = Status.Code.NOT_FOUND.value();
     Throwable grpcError =
-        new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null);
+        new Exceptions.AppendSerializationError(notFoundVal, "Test Error", "Stream name", null);
     c.setError(grpcError);
 
     // Test disabled SupportMetricsDeletion