You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/06/21 15:32:37 UTC

[beam] 01/01: Revert "Adding error tag and metrics in SpannerWriteSchemaTransformProvider (#27021)"

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch revert-27021-more-dlq
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2c3afccc5e26ddcc3de4cd18a539491eb838ccf2
Author: Yi Hu <hu...@gmail.com>
AuthorDate: Wed Jun 21 11:32:28 2023 -0400

    Revert "Adding error tag and metrics in SpannerWriteSchemaTransformProvider (#27021)"
    
    This reverts commit 7ecd8e514448406a15bedc85a680b1e7153774f9.
---
 .../SpannerWriteSchemaTransformProvider.java       | 37 ++--------------------
 1 file changed, 2 insertions(+), 35 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
index a21be9be671..43e0de3b903 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
@@ -21,13 +21,10 @@ import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import com.google.cloud.spanner.Mutation;
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -35,11 +32,9 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
@@ -74,31 +69,6 @@ public class SpannerWriteSchemaTransformProvider
       this.configuration = configuration;
     }
 
-    // A generic counter for PCollection of Row. Will be initialized with the given
-    // name argument. Performs element-wise counter of the input PCollection.
-    private static class ElementCounterFn extends DoFn<Row, Row> {
-
-      private Counter spannerGenericElementCounter;
-      private Long elementsInBundle = 0L;
-
-      ElementCounterFn(String name) {
-        this.spannerGenericElementCounter =
-            Metrics.counter(SpannerSchemaTransformWrite.class, name);
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) {
-        this.elementsInBundle += 1;
-        c.output(c.element());
-      }
-
-      @FinishBundle
-      public void finish(FinishBundleContext c) {
-        this.spannerGenericElementCounter.inc(this.elementsInBundle);
-        this.elementsInBundle = 0L;
-      }
-    }
-
     @Override
     public @UnknownKeyFor @NonNull @Initialized PTransform<
             @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
@@ -156,11 +126,8 @@ public class SpannerWriteSchemaTransformProvider
                                                           mutation.getValues().iterator()))
                                                   .build())
                                       .collect(Collectors.toList())))
-                  .apply(
-                      "error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter")))
                   .setRowSchema(failureSchema);
-
-          return PCollectionRowTuple.of("failures", failures).and("errors", failures);
+          return PCollectionRowTuple.of("failures", failures);
         }
       };
     }
@@ -180,7 +147,7 @@ public class SpannerWriteSchemaTransformProvider
   @Override
   public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
       outputCollectionNames() {
-    return Arrays.asList("failures", "errors");
+    return Collections.singletonList("failures");
   }
 
   @AutoValue