You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/30 19:59:47 UTC

[beam] branch master updated: Merge pull request #17422 from [BEAM-14344]: remove tracing from spannerio change streams

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e4beb6f9208 Merge pull request #17422 from [BEAM-14344]: remove tracing from spannerio change streams
e4beb6f9208 is described below

commit e4beb6f9208d3cc34db5f6fcb636d200683938d2
Author: Thiago Nunes <th...@google.com>
AuthorDate: Sun May 1 05:59:41 2022 +1000

    Merge pull request #17422 from [BEAM-14344]: remove tracing from spannerio change streams
    
    Removes distributed tracing from spannerio change streams. The
    distributed tracing is not currently adding any value to the debugging
    or understanding of the process execution. We remove tracing here in
    order to simplify the code.
---
 sdks/java/io/google-cloud-platform/build.gradle    |   1 -
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  | 180 ++++++++++-----------
 .../spanner/changestreams/ChangeStreamMetrics.java |   6 -
 .../action/ChildPartitionsRecordAction.java        | 132 +++++++--------
 .../action/DataChangeRecordAction.java             |  42 ++---
 .../action/HeartbeatRecordAction.java              |  40 ++---
 .../action/QueryChangeStreamAction.java            | 161 +++++++++---------
 .../changestreams/dao/PartitionMetadataDao.java    | 133 +++++----------
 .../dofn/DetectNewPartitionsDoFn.java              |  12 +-
 .../dofn/ReadChangeStreamPartitionDoFn.java        |  28 +---
 10 files changed, 280 insertions(+), 455 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index cd71fc35e21..392fe06a931 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -143,7 +143,6 @@ dependencies {
   implementation library.java.arrow_vector
 
   implementation "org.threeten:threetenbp:1.4.4"
-  implementation "io.opencensus:opencensus-api:0.31.0"
 
   testImplementation library.java.arrow_memory_netty
   testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 66a9ad1011c..2b76bb0db5b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -49,13 +49,6 @@ import com.google.cloud.spanner.SpannerOptions;
 import com.google.cloud.spanner.Statement;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TimestampBound;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.Sampler;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
-import io.opencensus.trace.config.TraceConfig;
-import io.opencensus.trace.config.TraceParams;
-import io.opencensus.trace.samplers.Samplers;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -1388,6 +1381,8 @@ public class SpannerIO {
 
     abstract @Nullable RpcPriority getRpcPriority();
 
+    /** @deprecated This configuration has no effect, as tracing is not available */
+    @Deprecated
     abstract @Nullable Double getTraceSampleProbability();
 
     abstract Builder toBuilder();
@@ -1489,7 +1484,12 @@ public class SpannerIO {
       return toBuilder().setRpcPriority(rpcPriority).build();
     }
 
-    /** Specifies the sample probability of tracing requests. */
+    /**
+     * Specifies the sample probability of tracing requests.
+     *
+     * @deprecated This configuration has no effect, as tracing is not available.
+     */
+    @Deprecated
     public ReadChangeStream withTraceSampleProbability(Double probability) {
       return toBuilder().setTraceSampleProbability(probability).build();
     }
@@ -1544,102 +1544,84 @@ public class SpannerIO {
           MoreObjects.firstNonNull(
               getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
 
-      if (getTraceSampleProbability() != null) {
-        TraceConfig globalTraceConfig = Tracing.getTraceConfig();
-        final Sampler sampler = Samplers.probabilitySampler(getTraceSampleProbability());
-        globalTraceConfig.updateActiveTraceParams(
-            TraceParams.DEFAULT.toBuilder().setSampler(sampler).build());
+      SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
+      // Set default retryable errors for ReadChangeStream
+      if (changeStreamSpannerConfig.getRetryableCodes() == null) {
+        ImmutableSet<Code> defaultRetryableCodes = ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED);
+        changeStreamSpannerConfig =
+            changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build();
       }
-      Tracer tracer = Tracing.getTracer();
-      try (Scope scope =
-          tracer
-              .spanBuilder("SpannerIO.ReadChangeStream.expand")
-              .setRecordEvents(true)
-              .startScopedSpan()) {
-        SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
-        // Set default retryable errors for ReadChangeStream
-        if (changeStreamSpannerConfig.getRetryableCodes() == null) {
-          ImmutableSet<Code> defaultRetryableCodes =
-              ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED);
-          changeStreamSpannerConfig =
-              changeStreamSpannerConfig
-                  .toBuilder()
-                  .setRetryableCodes(defaultRetryableCodes)
-                  .build();
-        }
-        // Set default retry timeouts for ReadChangeStream
-        if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) {
-          changeStreamSpannerConfig =
-              changeStreamSpannerConfig
-                  .toBuilder()
-                  .setExecuteStreamingSqlRetrySettings(
-                      RetrySettings.newBuilder()
-                          .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
-                          .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
-                          .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
-                          .build())
-                  .build();
-        }
-        final SpannerConfig partitionMetadataSpannerConfig =
+      // Set default retry timeouts for ReadChangeStream
+      if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) {
+        changeStreamSpannerConfig =
             changeStreamSpannerConfig
                 .toBuilder()
-                .setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
-                .setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
+                .setExecuteStreamingSqlRetrySettings(
+                    RetrySettings.newBuilder()
+                        .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
+                        .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
+                        .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
+                        .build())
                 .build();
-        final String changeStreamName = getChangeStreamName();
-        final Timestamp startTimestamp = getInclusiveStartAt();
-        // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
-        // interval into a closed-open in the read change stream restriction (prevents overflow)
-        final Timestamp endTimestamp =
-            getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
-                ? MAX_INCLUSIVE_END_AT
-                : getInclusiveEndAt();
-        final MapperFactory mapperFactory = new MapperFactory();
-        final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
-        final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
-        final RpcPriority rpcPriority =
-            MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
-        final DaoFactory daoFactory =
-            new DaoFactory(
-                changeStreamSpannerConfig,
-                changeStreamName,
-                partitionMetadataSpannerConfig,
-                partitionMetadataTableName,
-                rpcPriority,
-                input.getPipeline().getOptions().getJobName());
-        final ActionFactory actionFactory = new ActionFactory();
-
-        final InitializeDoFn initializeDoFn =
-            new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
-        final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
-            new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
-        final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
-            new ReadChangeStreamPartitionDoFn(
-                daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
-        final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
-            new PostProcessingMetricsDoFn(metrics);
-
-        LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
-        input
-            .getPipeline()
-            .getOptions()
-            .as(SpannerChangeStreamOptions.class)
-            .setMetadataTable(partitionMetadataTableName);
-
-        PCollection<byte[]> impulseOut = input.apply(Impulse.create());
-        PCollection<DataChangeRecord> results =
-            impulseOut
-                .apply("Initialize the connector", ParDo.of(initializeDoFn))
-                .apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
-                .apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
-                .apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));
-
-        impulseOut
-            .apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
-            .apply(Wait.on(results))
-            .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
-        return results;
       }
+      final SpannerConfig partitionMetadataSpannerConfig =
+          changeStreamSpannerConfig
+              .toBuilder()
+              .setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
+              .setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
+              .build();
+      final String changeStreamName = getChangeStreamName();
+      final Timestamp startTimestamp = getInclusiveStartAt();
+      // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
+      // interval into a closed-open in the read change stream restriction (prevents overflow)
+      final Timestamp endTimestamp =
+          getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
+              ? MAX_INCLUSIVE_END_AT
+              : getInclusiveEndAt();
+      final MapperFactory mapperFactory = new MapperFactory();
+      final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
+      final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
+      final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
+      final DaoFactory daoFactory =
+          new DaoFactory(
+              changeStreamSpannerConfig,
+              changeStreamName,
+              partitionMetadataSpannerConfig,
+              partitionMetadataTableName,
+              rpcPriority,
+              input.getPipeline().getOptions().getJobName());
+      final ActionFactory actionFactory = new ActionFactory();
+
+      final InitializeDoFn initializeDoFn =
+          new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
+      final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
+          new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
+      final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
+          new ReadChangeStreamPartitionDoFn(
+              daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
+      final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
+          new PostProcessingMetricsDoFn(metrics);
+
+      LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
+      input
+          .getPipeline()
+          .getOptions()
+          .as(SpannerChangeStreamOptions.class)
+          .setMetadataTable(partitionMetadataTableName);
+
+      PCollection<byte[]> impulseOut = input.apply(Impulse.create());
+      PCollection<DataChangeRecord> results =
+          impulseOut
+              .apply("Initialize the connector", ParDo.of(initializeDoFn))
+              .apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
+              .apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
+              .apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));
+
+      impulseOut
+          .apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
+          .apply(Wait.on(results))
+          .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
+      return results;
     }
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
index c6e09425372..f82a881cc0b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
@@ -32,12 +32,6 @@ public class ChangeStreamMetrics implements Serializable {
 
   private static final long serialVersionUID = 8187140831756972470L;
 
-  // ----
-  // Tracing Labels
-
-  /** Cloud Tracing label for Partition Tokens. */
-  public static final String PARTITION_ID_ATTRIBUTE_LABEL = "PartitionID";
-
   // ------------------------
   // Partition record metrics
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
index 1925a9b0a86..e39df3e2d44 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
@@ -17,14 +17,9 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
-import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED;
 
 import com.google.cloud.Timestamp;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
@@ -50,7 +45,6 @@ import org.slf4j.LoggerFactory;
 public class ChildPartitionsRecordAction {
 
   private static final Logger LOG = LoggerFactory.getLogger(ChildPartitionsRecordAction.class);
-  private static final Tracer TRACER = Tracing.getTracer();
   private final PartitionMetadataDao partitionMetadataDao;
   private final ChangeStreamMetrics metrics;
 
@@ -114,30 +108,24 @@ public class ChildPartitionsRecordAction {
       ManualWatermarkEstimator<Instant> watermarkEstimator) {
 
     final String token = partition.getPartitionToken();
-    try (Scope scope =
-        TRACER.spanBuilder("ChildPartitionsRecordAction").setRecordEvents(true).startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(token));
-
-      LOG.debug("[" + token + "] Processing child partition record " + record);
-
-      final Timestamp startTimestamp = record.getStartTimestamp();
-      final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime());
-      if (!tracker.tryClaim(startTimestamp)) {
-        LOG.debug(
-            "[" + token + "] Could not claim queryChangeStream(" + startTimestamp + "), stopping");
-        return Optional.of(ProcessContinuation.stop());
-      }
-      watermarkEstimator.setWatermark(startInstant);
-
-      for (ChildPartition childPartition : record.getChildPartitions()) {
-        processChildPartition(partition, record, childPartition);
-      }
-
-      LOG.debug("[" + token + "] Child partitions action completed successfully");
-      return Optional.empty();
+
+    LOG.debug("[" + token + "] Processing child partition record " + record);
+
+    final Timestamp startTimestamp = record.getStartTimestamp();
+    final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime());
+    if (!tracker.tryClaim(startTimestamp)) {
+      LOG.debug(
+          "[" + token + "] Could not claim queryChangeStream(" + startTimestamp + "), stopping");
+      return Optional.of(ProcessContinuation.stop());
     }
+    watermarkEstimator.setWatermark(startInstant);
+
+    for (ChildPartition childPartition : record.getChildPartitions()) {
+      processChildPartition(partition, record, childPartition);
+    }
+
+    LOG.debug("[" + token + "] Child partitions action completed successfully");
+    return Optional.empty();
   }
 
   // Unboxing of runInTransaction result will not produce a null value, we can ignore it
@@ -145,58 +133,46 @@ public class ChildPartitionsRecordAction {
   private void processChildPartition(
       PartitionMetadata partition, ChildPartitionsRecord record, ChildPartition childPartition) {
 
-    try (Scope scope =
-        TRACER
-            .spanBuilder("ChildPartitionsRecordAction.processChildPartition")
-            .setRecordEvents(true)
-            .startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(
-              PARTITION_ID_ATTRIBUTE_LABEL,
-              AttributeValue.stringAttributeValue(partition.getPartitionToken()));
-
-      final String partitionToken = partition.getPartitionToken();
-      final String childPartitionToken = childPartition.getToken();
-      final boolean isSplit = isSplit(childPartition);
+    final String partitionToken = partition.getPartitionToken();
+    final String childPartitionToken = childPartition.getToken();
+    final boolean isSplit = isSplit(childPartition);
+    LOG.debug(
+        "["
+            + partitionToken
+            + "] Processing child partition"
+            + (isSplit ? " split" : " merge")
+            + " event");
+
+    final PartitionMetadata row =
+        toPartitionMetadata(
+            record.getStartTimestamp(),
+            partition.getEndTimestamp(),
+            partition.getHeartbeatMillis(),
+            childPartition);
+    LOG.debug("[" + partitionToken + "] Inserting child partition token " + childPartitionToken);
+    final Boolean insertedRow =
+        partitionMetadataDao
+            .runInTransaction(
+                transaction -> {
+                  if (transaction.getPartition(childPartitionToken) == null) {
+                    transaction.insert(row);
+                    return true;
+                  } else {
+                    return false;
+                  }
+                })
+            .getResult();
+    if (insertedRow && isSplit) {
+      metrics.incPartitionRecordSplitCount();
+    } else if (insertedRow) {
+      metrics.incPartitionRecordMergeCount();
+    } else {
       LOG.debug(
           "["
               + partitionToken
-              + "] Processing child partition"
-              + (isSplit ? " split" : " merge")
-              + " event");
-
-      final PartitionMetadata row =
-          toPartitionMetadata(
-              record.getStartTimestamp(),
-              partition.getEndTimestamp(),
-              partition.getHeartbeatMillis(),
-              childPartition);
-      LOG.debug("[" + partitionToken + "] Inserting child partition token " + childPartitionToken);
-      final Boolean insertedRow =
-          partitionMetadataDao
-              .runInTransaction(
-                  transaction -> {
-                    if (transaction.getPartition(childPartitionToken) == null) {
-                      transaction.insert(row);
-                      return true;
-                    } else {
-                      return false;
-                    }
-                  })
-              .getResult();
-      if (insertedRow && isSplit) {
-        metrics.incPartitionRecordSplitCount();
-      } else if (insertedRow) {
-        metrics.incPartitionRecordMergeCount();
-      } else {
-        LOG.debug(
-            "["
-                + partitionToken
-                + "] Child token "
-                + childPartitionToken
-                + " already exists, skipping...");
-      }
+              + "] Child token "
+              + childPartitionToken
+              + " already exists, skipping...");
     }
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
index cd6280e7115..2446a6c9316 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
@@ -17,13 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
-import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
-
 import com.google.cloud.Timestamp;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
@@ -46,7 +40,6 @@ import org.slf4j.LoggerFactory;
  */
 public class DataChangeRecordAction {
   private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class);
-  private static final Tracer TRACER = Tracing.getTracer();
 
   /**
    * This is the main processing function for a {@link DataChangeRecord}. It returns an {@link
@@ -85,29 +78,20 @@ public class DataChangeRecordAction {
       OutputReceiver<DataChangeRecord> outputReceiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator) {
 
-    try (Scope scope =
-        TRACER.spanBuilder("DataChangeRecordAction").setRecordEvents(true).startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(
-              PARTITION_ID_ATTRIBUTE_LABEL,
-              AttributeValue.stringAttributeValue(partition.getPartitionToken()));
-
-      final String token = partition.getPartitionToken();
-      LOG.debug("[" + token + "] Processing data record " + record.getCommitTimestamp());
+    final String token = partition.getPartitionToken();
+    LOG.debug("[" + token + "] Processing data record " + record.getCommitTimestamp());
 
-      final Timestamp commitTimestamp = record.getCommitTimestamp();
-      final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime());
-      if (!tracker.tryClaim(commitTimestamp)) {
-        LOG.debug(
-            "[" + token + "] Could not claim queryChangeStream(" + commitTimestamp + "), stopping");
-        return Optional.of(ProcessContinuation.stop());
-      }
-      outputReceiver.outputWithTimestamp(record, commitInstant);
-      watermarkEstimator.setWatermark(commitInstant);
-
-      LOG.debug("[" + token + "] Data record action completed successfully");
-      return Optional.empty();
+    final Timestamp commitTimestamp = record.getCommitTimestamp();
+    final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime());
+    if (!tracker.tryClaim(commitTimestamp)) {
+      LOG.debug(
+          "[" + token + "] Could not claim queryChangeStream(" + commitTimestamp + "), stopping");
+      return Optional.of(ProcessContinuation.stop());
     }
+    outputReceiver.outputWithTimestamp(record, commitInstant);
+    watermarkEstimator.setWatermark(commitInstant);
+
+    LOG.debug("[" + token + "] Data record action completed successfully");
+    return Optional.empty();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
index 5c480d275ff..a940f57b989 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
@@ -17,13 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
-import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
-
 import com.google.cloud.Timestamp;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
@@ -45,7 +39,6 @@ import org.slf4j.LoggerFactory;
  */
 public class HeartbeatRecordAction {
   private static final Logger LOG = LoggerFactory.getLogger(HeartbeatRecordAction.class);
-  private static final Tracer TRACER = Tracing.getTracer();
   private final ChangeStreamMetrics metrics;
 
   /**
@@ -81,28 +74,19 @@ public class HeartbeatRecordAction {
       RestrictionTracker<TimestampRange, Timestamp> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator) {
 
-    try (Scope scope =
-        TRACER.spanBuilder("HeartbeatRecordAction").setRecordEvents(true).startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(
-              PARTITION_ID_ATTRIBUTE_LABEL,
-              AttributeValue.stringAttributeValue(partition.getPartitionToken()));
-
-      final String token = partition.getPartitionToken();
-      LOG.debug("[" + token + "] Processing heartbeat record " + record);
+    final String token = partition.getPartitionToken();
+    LOG.debug("[" + token + "] Processing heartbeat record " + record);
 
-      final Timestamp timestamp = record.getTimestamp();
-      final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime());
-      if (!tracker.tryClaim(timestamp)) {
-        LOG.debug("[" + token + "] Could not claim queryChangeStream(" + timestamp + "), stopping");
-        return Optional.of(ProcessContinuation.stop());
-      }
-      metrics.incHeartbeatRecordCount();
-      watermarkEstimator.setWatermark(timestampInstant);
-
-      LOG.debug("[" + token + "] Heartbeat record action completed successfully");
-      return Optional.empty();
+    final Timestamp timestamp = record.getTimestamp();
+    final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime());
+    if (!tracker.tryClaim(timestamp)) {
+      LOG.debug("[" + token + "] Could not claim queryChangeStream(" + timestamp + "), stopping");
+      return Optional.of(ProcessContinuation.stop());
     }
+    metrics.incHeartbeatRecordCount();
+    watermarkEstimator.setWatermark(timestampInstant);
+
+    LOG.debug("[" + token + "] Heartbeat record action completed successfully");
+    return Optional.empty();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 57e3502fcc2..6afb57c6e94 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -17,15 +17,9 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
-import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
-
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.ErrorCode;
 import com.google.cloud.spanner.SpannerException;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Optional;
@@ -69,7 +63,6 @@ import org.slf4j.LoggerFactory;
 public class QueryChangeStreamAction {
 
   private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
-  private static final Tracer TRACER = Tracing.getTracer();
   private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
   private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";
 
@@ -167,96 +160,86 @@ public class QueryChangeStreamAction {
     final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
     final Timestamp endTimestamp = partition.getEndTimestamp();
 
-    try (Scope scope =
-        TRACER.spanBuilder("QueryChangeStreamAction").setRecordEvents(true).startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(token));
-
-      // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
-      // ReadChangeStreamPartitionDoFn#processElement is called
-      final PartitionMetadata updatedPartition =
-          Optional.ofNullable(partitionMetadataDao.getPartition(token))
-              .map(partitionMetadataMapper::from)
-              .orElseThrow(
-                  () ->
-                      new IllegalStateException(
-                          "Partition " + token + " not found in metadata table"));
+    // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
+    // ReadChangeStreamPartitionDoFn#processElement is called
+    final PartitionMetadata updatedPartition =
+        Optional.ofNullable(partitionMetadataDao.getPartition(token))
+            .map(partitionMetadataMapper::from)
+            .orElseThrow(
+                () ->
+                    new IllegalStateException(
+                        "Partition " + token + " not found in metadata table"));
 
-      try (ChangeStreamResultSet resultSet =
-          changeStreamDao.changeStreamQuery(
-              token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
+    try (ChangeStreamResultSet resultSet =
+        changeStreamDao.changeStreamQuery(
+            token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
 
-        metrics.incQueryCounter();
-        while (resultSet.next()) {
-          final List<ChangeStreamRecord> records =
-              changeStreamRecordMapper.toChangeStreamRecords(
-                  updatedPartition, resultSet.getCurrentRowAsStruct(), resultSet.getMetadata());
+      metrics.incQueryCounter();
+      while (resultSet.next()) {
+        final List<ChangeStreamRecord> records =
+            changeStreamRecordMapper.toChangeStreamRecords(
+                updatedPartition, resultSet.getCurrentRowAsStruct(), resultSet.getMetadata());
 
-          Optional<ProcessContinuation> maybeContinuation;
-          for (final ChangeStreamRecord record : records) {
-            if (record instanceof DataChangeRecord) {
-              maybeContinuation =
-                  dataChangeRecordAction.run(
-                      updatedPartition,
-                      (DataChangeRecord) record,
-                      tracker,
-                      receiver,
-                      watermarkEstimator);
-            } else if (record instanceof HeartbeatRecord) {
-              maybeContinuation =
-                  heartbeatRecordAction.run(
-                      updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator);
-            } else if (record instanceof ChildPartitionsRecord) {
-              maybeContinuation =
-                  childPartitionsRecordAction.run(
-                      updatedPartition,
-                      (ChildPartitionsRecord) record,
-                      tracker,
-                      watermarkEstimator);
-            } else {
-              LOG.error("[" + token + "] Unknown record type " + record.getClass());
-              throw new IllegalArgumentException("Unknown record type " + record.getClass());
-            }
+        Optional<ProcessContinuation> maybeContinuation;
+        for (final ChangeStreamRecord record : records) {
+          if (record instanceof DataChangeRecord) {
+            maybeContinuation =
+                dataChangeRecordAction.run(
+                    updatedPartition,
+                    (DataChangeRecord) record,
+                    tracker,
+                    receiver,
+                    watermarkEstimator);
+          } else if (record instanceof HeartbeatRecord) {
+            maybeContinuation =
+                heartbeatRecordAction.run(
+                    updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator);
+          } else if (record instanceof ChildPartitionsRecord) {
+            maybeContinuation =
+                childPartitionsRecordAction.run(
+                    updatedPartition, (ChildPartitionsRecord) record, tracker, watermarkEstimator);
+          } else {
+            LOG.error("[" + token + "] Unknown record type " + record.getClass());
+            throw new IllegalArgumentException("Unknown record type " + record.getClass());
+          }
 
-            // The size of a record is represented by the number of bytes needed for the
-            // string representation of the record. Here, we only try to achieve an estimate
-            // instead of an accurate throughput.
-            this.throughputEstimator.update(
-                Timestamp.now(), record.toString().getBytes(StandardCharsets.UTF_8).length);
+          // The size of a record is represented by the number of bytes needed for the
+          // string representation of the record. Here, we only try to achieve an estimate
+          // instead of an accurate throughput.
+          this.throughputEstimator.update(
+              Timestamp.now(), record.toString().getBytes(StandardCharsets.UTF_8).length);
 
-            if (maybeContinuation.isPresent()) {
-              LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
-              bundleFinalizer.afterBundleCommit(
-                  Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
-                  updateWatermarkCallback(token, watermarkEstimator));
-              return maybeContinuation.get();
-            }
+          if (maybeContinuation.isPresent()) {
+            LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
+            bundleFinalizer.afterBundleCommit(
+                Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
+                updateWatermarkCallback(token, watermarkEstimator));
+            return maybeContinuation.get();
           }
         }
-        bundleFinalizer.afterBundleCommit(
-            Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
-            updateWatermarkCallback(token, watermarkEstimator));
+      }
+      bundleFinalizer.afterBundleCommit(
+          Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
+          updateWatermarkCallback(token, watermarkEstimator));
 
-      } catch (SpannerException e) {
-        /*
-        If there is a split when a partition is supposed to be finished, the residual will try
-        to perform a change stream query for an out of range interval. We ignore this error
-        here, and the residual should be able to claim the end of the timestamp range, finishing
-        the partition.
-        */
-        if (isTimestampOutOfRange(e)) {
-          LOG.debug(
-              "["
-                  + token
-                  + "] query change stream is out of range for "
-                  + startTimestamp
-                  + " to "
-                  + endTimestamp
-                  + ", finishing stream");
-        } else {
-          throw e;
-        }
+    } catch (SpannerException e) {
+      /*
+      If there is a split when a partition is supposed to be finished, the residual will try
+      to perform a change stream query for an out of range interval. We ignore this error
+      here, and the residual should be able to claim the end of the timestamp range, finishing
+      the partition.
+      */
+      if (isTimestampOutOfRange(e)) {
+        LOG.debug(
+            "["
+                + token
+                + "] query change stream is out of range for "
+                + startTimestamp
+                + " to "
+                + endTimestamp
+                + ", finishing stream");
+      } else {
+        throw e;
       }
     }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index c452ba56305..1bf5af0b321 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
 
-import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
@@ -39,10 +38,6 @@ import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TransactionContext;
 import com.google.cloud.spanner.TransactionRunner;
 import com.google.cloud.spanner.Value;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,7 +50,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 
 /** Data access object for the Connector metadata tables. */
 public class PartitionMetadataDao {
-  private static final Tracer TRACER = Tracing.getTracer();
 
   private final String metadataTableName;
   private final DatabaseClient databaseClient;
@@ -100,29 +94,23 @@ public class PartitionMetadataDao {
    *     returns null.
    */
   public @Nullable Struct getPartition(String partitionToken) {
-    try (Scope scope = TRACER.spanBuilder("getPartition").setRecordEvents(true).startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(
-              PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
-      try (ResultSet resultSet =
-          databaseClient
-              .singleUse()
-              .executeQuery(
-                  Statement.newBuilder(
-                          "SELECT * FROM "
-                              + metadataTableName
-                              + " WHERE "
-                              + COLUMN_PARTITION_TOKEN
-                              + " = @partition")
-                      .bind("partition")
-                      .to(partitionToken)
-                      .build())) {
-        if (resultSet.next()) {
-          return resultSet.getCurrentRowAsStruct();
-        }
-        return null;
+    try (ResultSet resultSet =
+        databaseClient
+            .singleUse()
+            .executeQuery(
+                Statement.newBuilder(
+                        "SELECT * FROM "
+                            + metadataTableName
+                            + " WHERE "
+                            + COLUMN_PARTITION_TOKEN
+                            + " = @partition")
+                    .bind("partition")
+                    .to(partitionToken)
+                    .build())) {
+      if (resultSet.next()) {
+        return resultSet.getCurrentRowAsStruct();
       }
+      return null;
     }
   }
 
@@ -148,9 +136,7 @@ public class PartitionMetadataDao {
             .bind("state")
             .to(State.FINISHED.name())
             .build();
-    try (Scope scope =
-            TRACER.spanBuilder("getMinCurrentWatermark").setRecordEvents(true).startScopedSpan();
-        ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) {
+    try (ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) {
       if (resultSet.next()) {
         return resultSet.getTimestamp(COLUMN_WATERMARK);
       }
@@ -268,7 +254,6 @@ public class PartitionMetadataDao {
   /** Represents the execution of a read / write transaction in Cloud Spanner. */
   public static class InTransactionContext {
 
-    private static final Tracer TRACER = Tracing.getTracer();
     private final String metadataTableName;
     private final TransactionContext transaction;
     private final Map<State, String> stateToTimestampColumn;
@@ -295,15 +280,8 @@ public class PartitionMetadataDao {
      * @param row the partition metadata to be inserted
      */
     public Void insert(PartitionMetadata row) {
-      try (Scope scope = TRACER.spanBuilder("insert").setRecordEvents(true).startScopedSpan()) {
-        TRACER
-            .getCurrentSpan()
-            .putAttribute(
-                PARTITION_ID_ATTRIBUTE_LABEL,
-                AttributeValue.stringAttributeValue(row.getPartitionToken()));
-        transaction.buffer(ImmutableList.of(createInsertMetadataMutationFrom(row)));
-        return null;
-      }
+      transaction.buffer(ImmutableList.of(createInsertMetadataMutationFrom(row)));
+      return null;
     }
 
     /**
@@ -326,16 +304,9 @@ public class PartitionMetadataDao {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToRunning(String partitionToken) {
-      try (Scope scope =
-          TRACER.spanBuilder("updateToRunning").setRecordEvents(true).startScopedSpan()) {
-        TRACER
-            .getCurrentSpan()
-            .putAttribute(
-                PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
-        transaction.buffer(
-            ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.RUNNING)));
-        return null;
-      }
+      transaction.buffer(
+          ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.RUNNING)));
+      return null;
     }
 
     /**
@@ -344,17 +315,9 @@ public class PartitionMetadataDao {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToFinished(String partitionToken) {
-      try (Scope scope =
-          TRACER.spanBuilder("updateToRunning").setRecordEvents(true).startScopedSpan()) {
-        TRACER
-            .getCurrentSpan()
-            .putAttribute(
-                PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
-        transaction.buffer(
-            ImmutableList.of(
-                createUpdateMetadataStateMutationFrom(partitionToken, State.FINISHED)));
-        return null;
-      }
+      transaction.buffer(
+          ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.FINISHED)));
+      return null;
     }
 
     /**
@@ -365,15 +328,8 @@ public class PartitionMetadataDao {
      * @return the commit timestamp of the read / write transaction
      */
     public Void updateWatermark(String partitionToken, Timestamp watermark) {
-      try (Scope scope =
-          TRACER.spanBuilder("updateCurrentWatermark").setRecordEvents(true).startScopedSpan()) {
-        TRACER
-            .getCurrentSpan()
-            .putAttribute(
-                PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
-        transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken, watermark));
-        return null;
-      }
+      transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken, watermark));
+      return null;
     }
 
     /**
@@ -384,28 +340,21 @@ public class PartitionMetadataDao {
      *     returns null.
      */
     public @Nullable Struct getPartition(String partitionToken) {
-      try (Scope scope =
-          TRACER.spanBuilder("getPartition").setRecordEvents(true).startScopedSpan()) {
-        TRACER
-            .getCurrentSpan()
-            .putAttribute(
-                PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
-        try (ResultSet resultSet =
-            transaction.executeQuery(
-                Statement.newBuilder(
-                        "SELECT * FROM "
-                            + metadataTableName
-                            + " WHERE "
-                            + COLUMN_PARTITION_TOKEN
-                            + " = @partition")
-                    .bind("partition")
-                    .to(partitionToken)
-                    .build())) {
-          if (resultSet.next()) {
-            return resultSet.getCurrentRowAsStruct();
-          }
-          return null;
+      try (ResultSet resultSet =
+          transaction.executeQuery(
+              Statement.newBuilder(
+                      "SELECT * FROM "
+                          + metadataTableName
+                          + " WHERE "
+                          + COLUMN_PARTITION_TOKEN
+                          + " = @partition")
+                  .bind("partition")
+                  .to(partitionToken)
+                  .build())) {
+        if (resultSet.next()) {
+          return resultSet.getCurrentRowAsStruct();
         }
+        return null;
       }
     }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
index c3e9d6ec12a..dbc1fbdfdaf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
 
-import io.opencensus.common.Scope;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction;
@@ -55,7 +52,6 @@ public class DetectNewPartitionsDoFn extends DoFn<PartitionMetadata, PartitionMe
 
   private static final long serialVersionUID = 1523712495885011374L;
   private static final Duration DEFAULT_RESUME_DURATION = Duration.millis(100L);
-  private static final Tracer TRACER = Tracing.getTracer();
 
   private final Duration resumeDuration;
   private final DaoFactory daoFactory;
@@ -139,12 +135,6 @@ public class DetectNewPartitionsDoFn extends DoFn<PartitionMetadata, PartitionMe
       OutputReceiver<PartitionMetadata> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator) {
 
-    try (Scope scope =
-        TRACER
-            .spanBuilder("DetectNewPartitionsDoFn.processElement")
-            .setRecordEvents(true)
-            .startScopedSpan()) {
-      return detectNewPartitionsAction.run(tracker, receiver, watermarkEstimator);
-    }
+    return detectNewPartitionsAction.run(tracker, receiver, watermarkEstimator);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index 6c1ab8feaf9..f5cf02fb7d3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -17,12 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
 
-import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
-
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
@@ -68,7 +62,6 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
 
   private static final long serialVersionUID = -7574596218085711975L;
   private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
-  private static final Tracer TRACER = Tracing.getTracer();
   private static final double AUTOSCALING_SIZE_MULTIPLIER = 2.0D;
 
   private final DaoFactory daoFactory;
@@ -234,20 +227,11 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
       BundleFinalizer bundleFinalizer) {
 
     final String token = partition.getPartitionToken();
-    try (Scope scope =
-        TRACER
-            .spanBuilder("ReadChangeStreamPartitionDoFn.processElement")
-            .setRecordEvents(true)
-            .startScopedSpan()) {
-      TRACER
-          .getCurrentSpan()
-          .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(token));
-
-      LOG.debug(
-          "[" + token + "] Processing element with restriction " + tracker.currentRestriction());
-
-      return queryChangeStreamAction.run(
-          partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
-    }
+
+    LOG.debug(
+        "[" + token + "] Processing element with restriction " + tracker.currentRestriction());
+
+    return queryChangeStreamAction.run(
+        partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
   }
 }