You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/09/12 20:40:10 UTC

[beam] branch release-2.42.0 updated: [release-2.42.0] Only close committers after all CheckpointMarks have gone away. (#23188)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.42.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.42.0 by this push:
     new 557020ef2a2 [release-2.42.0] Only close committers after all CheckpointMarks have gone away.  (#23188)
557020ef2a2 is described below

commit 557020ef2a20cbca13e8ae47d88ddd527e997611
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Mon Sep 12 13:40:00 2022 -0700

    [release-2.42.0] Only close committers after all CheckpointMarks have gone away.  (#23188)
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    * Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
    
    Co-authored-by: Daniel Collins <dp...@google.com>
---
 .../pubsublite/internal/CheckpointMarkImpl.java    |  7 ++-
 .../gcp/pubsublite/internal/CloserReference.java   | 68 ++++++++++++++++++++++
 .../pubsublite/internal/UnboundedReaderImpl.java   |  6 +-
 .../pubsublite/internal/UnboundedSourceImpl.java   |  2 +-
 .../internal/CheckpointMarkImplTest.java           |  2 +-
 .../internal/UnboundedReaderImplTest.java          |  5 +-
 6 files changed, 79 insertions(+), 11 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
index acdae3efed8..b66275855b4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Optional;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -37,9 +38,9 @@ public class CheckpointMarkImpl implements CheckpointMark {
 
   final Offset offset;
 
-  private final Optional<BlockingCommitter> committer;
+  private final Optional<Supplier<BlockingCommitter>> committer;
 
-  CheckpointMarkImpl(Offset offset, BlockingCommitter committer) {
+  CheckpointMarkImpl(Offset offset, Supplier<BlockingCommitter> committer) {
     this.offset = offset;
     this.committer = Optional.of(committer);
   }
@@ -68,7 +69,7 @@ public class CheckpointMarkImpl implements CheckpointMark {
   public void finalizeCheckpoint() {
     try {
       checkState(committer.isPresent());
-      committer.get().commitOffset(offset);
+      committer.get().get().commitOffset(offset);
     } catch (Exception e) {
       logger.warn("Failed to finalize checkpoint.", e);
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CloserReference.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CloserReference.java
new file mode 100644
index 00000000000..089f0f2242f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CloserReference.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A class that safely ensures an object of type T is cleaned up before it is garbage collected. */
+class CloserReference<T extends AutoCloseable> implements Supplier<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CloserReference.class);
+
+  private final T object;
+
+  public static <T extends AutoCloseable> CloserReference<T> of(T object) {
+    return new CloserReference<>(object);
+  }
+
+  @Override
+  public T get() {
+    return object;
+  }
+
+  private CloserReference(T object) {
+    this.object = object;
+  }
+
+  private static class Closer implements Runnable {
+
+    private final AutoCloseable object;
+
+    private Closer(AutoCloseable object) {
+      this.object = object;
+    }
+
+    @Override
+    public void run() {
+      try {
+        object.close();
+      } catch (Exception e) {
+        LOG.warn("Failed to close resource with class: " + object.getClass().getCanonicalName(), e);
+      }
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  protected void finalize() {
+    SystemExecutors.getFuturesExecutor().execute(new Closer(object));
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
index adabdfd782b..99190471c9a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -39,7 +40,7 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
   private final UnboundedSource<SequencedMessage, CheckpointMarkImpl> source;
   private final MemoryBufferedSubscriber subscriber;
   private final TopicBacklogReader backlogReader;
-  private final BlockingCommitter committer;
+  private final Supplier<BlockingCommitter> committer;
 
   private Offset fetchOffset;
   private Optional<Instant> lastMessageTimestamp = Optional.empty();
@@ -49,7 +50,7 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
       UnboundedSource<SequencedMessage, CheckpointMarkImpl> source,
       MemoryBufferedSubscriber subscriber,
       TopicBacklogReader backlogReader,
-      BlockingCommitter committer,
+      Supplier<BlockingCommitter> committer,
       Offset initialOffset) {
     checkArgument(initialOffset.equals(subscriber.fetchOffset()));
     this.source = source;
@@ -79,7 +80,6 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
   @Override
   public void close() throws IOException {
     try (AutoCloseable c1 = backlogReader;
-        AutoCloseable c2 = committer;
         AutoCloseable c3 = asCloseable(subscriber)) {
     } catch (Exception e) {
       throw new IOException("Failed when closing reader.", e);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
index 55fc92a6ed2..a200bf3d86c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
@@ -105,7 +105,7 @@ public class UnboundedSourceImpl extends UnboundedSource<SequencedMessage, Check
         this,
         subscriber,
         readerFactory.create(subscription),
-        assembler.newCommitter(),
+        CloserReference.of(assembler.newCommitter()),
         initialOffset);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java
index 740d96928a4..0e1fc570591 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java
@@ -42,7 +42,7 @@ public class CheckpointMarkImplTest {
 
   @Before
   public void setUp() {
-    mark = new CheckpointMarkImpl(OFFSET, committer);
+    mark = new CheckpointMarkImpl(OFFSET, () -> committer);
   }
 
   @Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java
index 163e4ffb3a5..0dd45549077 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java
@@ -100,7 +100,8 @@ public class UnboundedReaderImplTest {
   @Before
   public void setUp() {
     doReturn(INITIAL_OFFSET).when(subscriber).fetchOffset();
-    reader = new UnboundedReaderImpl(source, subscriber, backlogReader, committer, Offset.of(1));
+    reader =
+        new UnboundedReaderImpl(source, subscriber, backlogReader, () -> committer, Offset.of(1));
   }
 
   @Test
@@ -192,11 +193,9 @@ public class UnboundedReaderImplTest {
     startSubscriber();
     doThrow(new IllegalStateException("abc")).when(subscriber).awaitTerminated(1, TimeUnit.MINUTES);
     doThrow(new IllegalStateException("def")).when(backlogReader).close();
-    doThrow(new IllegalStateException("ghi")).when(committer).close();
     assertThrows(IOException.class, reader::close);
     verify(subscriber).stopAsync();
     verify(subscriber).awaitTerminated(1, TimeUnit.MINUTES);
-    verify(committer).close();
     verify(backlogReader).close();
   }
 }