You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/06/21 15:32:37 UTC
[beam] 01/01: Revert "Adding error tag and metrics in SpannerWriteSchemaTransformProvider (#27021)"
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch revert-27021-more-dlq
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2c3afccc5e26ddcc3de4cd18a539491eb838ccf2
Author: Yi Hu <hu...@gmail.com>
AuthorDate: Wed Jun 21 11:32:28 2023 -0400
Revert "Adding error tag and metrics in SpannerWriteSchemaTransformProvider (#27021)"
This reverts commit 7ecd8e514448406a15bedc85a680b1e7153774f9.
---
.../SpannerWriteSchemaTransformProvider.java | 37 ++--------------------
1 file changed, 2 insertions(+), 35 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
index a21be9be671..43e0de3b903 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
@@ -21,13 +21,10 @@ import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -35,11 +32,9 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
@@ -74,31 +69,6 @@ public class SpannerWriteSchemaTransformProvider
this.configuration = configuration;
}
- // A generic counter for PCollection of Row. Will be initialized with the given
- // name argument. Performs element-wise counter of the input PCollection.
- private static class ElementCounterFn extends DoFn<Row, Row> {
-
- private Counter spannerGenericElementCounter;
- private Long elementsInBundle = 0L;
-
- ElementCounterFn(String name) {
- this.spannerGenericElementCounter =
- Metrics.counter(SpannerSchemaTransformWrite.class, name);
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- this.elementsInBundle += 1;
- c.output(c.element());
- }
-
- @FinishBundle
- public void finish(FinishBundleContext c) {
- this.spannerGenericElementCounter.inc(this.elementsInBundle);
- this.elementsInBundle = 0L;
- }
- }
-
@Override
public @UnknownKeyFor @NonNull @Initialized PTransform<
@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
@@ -156,11 +126,8 @@ public class SpannerWriteSchemaTransformProvider
mutation.getValues().iterator()))
.build())
.collect(Collectors.toList())))
- .apply(
- "error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter")))
.setRowSchema(failureSchema);
-
- return PCollectionRowTuple.of("failures", failures).and("errors", failures);
+ return PCollectionRowTuple.of("failures", failures);
}
};
}
@@ -180,7 +147,7 @@ public class SpannerWriteSchemaTransformProvider
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
- return Arrays.asList("failures", "errors");
+ return Collections.singletonList("failures");
}
@AutoValue