You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2022/09/12 12:41:54 UTC
[beam] branch master updated: pubsublite: Reduce commit logspam (#22762)
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5515f7a35d7 pubsublite: Reduce commit logspam (#22762)
5515f7a35d7 is described below
commit 5515f7a35d740932415fa63b9d3f9df37ab54469
Author: dpcollins-google <40...@users.noreply.github.com>
AuthorDate: Mon Sep 12 21:41:46 2022 +0900
pubsublite: Reduce commit logspam (#22762)
* Only close committers after all CheckpointMarks have gone away. This should substantially reduce logspam.
---
.../pubsublite/internal/CheckpointMarkImpl.java | 7 ++-
.../gcp/pubsublite/internal/CloserReference.java | 68 ++++++++++++++++++++++
.../pubsublite/internal/UnboundedReaderImpl.java | 6 +-
.../pubsublite/internal/UnboundedSourceImpl.java | 2 +-
.../internal/CheckpointMarkImplTest.java | 2 +-
.../internal/UnboundedReaderImplTest.java | 5 +-
6 files changed, 79 insertions(+), 11 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
index acdae3efed8..b66275855b4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImpl.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
+import java.util.function.Supplier;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -37,9 +38,9 @@ public class CheckpointMarkImpl implements CheckpointMark {
final Offset offset;
- private final Optional<BlockingCommitter> committer;
+ private final Optional<Supplier<BlockingCommitter>> committer;
- CheckpointMarkImpl(Offset offset, BlockingCommitter committer) {
+ CheckpointMarkImpl(Offset offset, Supplier<BlockingCommitter> committer) {
this.offset = offset;
this.committer = Optional.of(committer);
}
@@ -68,7 +69,7 @@ public class CheckpointMarkImpl implements CheckpointMark {
public void finalizeCheckpoint() {
try {
checkState(committer.isPresent());
- committer.get().commitOffset(offset);
+ committer.get().get().commitOffset(offset);
} catch (Exception e) {
logger.warn("Failed to finalize checkpoint.", e);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CloserReference.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CloserReference.java
new file mode 100644
index 00000000000..089f0f2242f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CloserReference.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A class that safely ensures an object of type T is cleaned up before it is garbage collected. */
+class CloserReference<T extends AutoCloseable> implements Supplier<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CloserReference.class);
+
+ private final T object;
+
+ public static <T extends AutoCloseable> CloserReference<T> of(T object) {
+ return new CloserReference<>(object);
+ }
+
+ @Override
+ public T get() {
+ return object;
+ }
+
+ private CloserReference(T object) {
+ this.object = object;
+ }
+
+ private static class Closer implements Runnable {
+
+ private final AutoCloseable object;
+
+ private Closer(AutoCloseable object) {
+ this.object = object;
+ }
+
+ @Override
+ public void run() {
+ try {
+ object.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close resource with class: " + object.getClass().getCanonicalName(), e);
+ }
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void finalize() {
+ SystemExecutors.getFuturesExecutor().execute(new Closer(object));
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
index adabdfd782b..99190471c9a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -39,7 +40,7 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
private final UnboundedSource<SequencedMessage, CheckpointMarkImpl> source;
private final MemoryBufferedSubscriber subscriber;
private final TopicBacklogReader backlogReader;
- private final BlockingCommitter committer;
+ private final Supplier<BlockingCommitter> committer;
private Offset fetchOffset;
private Optional<Instant> lastMessageTimestamp = Optional.empty();
@@ -49,7 +50,7 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
UnboundedSource<SequencedMessage, CheckpointMarkImpl> source,
MemoryBufferedSubscriber subscriber,
TopicBacklogReader backlogReader,
- BlockingCommitter committer,
+ Supplier<BlockingCommitter> committer,
Offset initialOffset) {
checkArgument(initialOffset.equals(subscriber.fetchOffset()));
this.source = source;
@@ -79,7 +80,6 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
@Override
public void close() throws IOException {
try (AutoCloseable c1 = backlogReader;
- AutoCloseable c2 = committer;
AutoCloseable c3 = asCloseable(subscriber)) {
} catch (Exception e) {
throw new IOException("Failed when closing reader.", e);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
index 55fc92a6ed2..a200bf3d86c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedSourceImpl.java
@@ -105,7 +105,7 @@ public class UnboundedSourceImpl extends UnboundedSource<SequencedMessage, Check
this,
subscriber,
readerFactory.create(subscription),
- assembler.newCommitter(),
+ CloserReference.of(assembler.newCommitter()),
initialOffset);
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java
index 740d96928a4..0e1fc570591 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/CheckpointMarkImplTest.java
@@ -42,7 +42,7 @@ public class CheckpointMarkImplTest {
@Before
public void setUp() {
- mark = new CheckpointMarkImpl(OFFSET, committer);
+ mark = new CheckpointMarkImpl(OFFSET, () -> committer);
}
@Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java
index 163e4ffb3a5..0dd45549077 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.java
@@ -100,7 +100,8 @@ public class UnboundedReaderImplTest {
@Before
public void setUp() {
doReturn(INITIAL_OFFSET).when(subscriber).fetchOffset();
- reader = new UnboundedReaderImpl(source, subscriber, backlogReader, committer, Offset.of(1));
+ reader =
+ new UnboundedReaderImpl(source, subscriber, backlogReader, () -> committer, Offset.of(1));
}
@Test
@@ -192,11 +193,9 @@ public class UnboundedReaderImplTest {
startSubscriber();
doThrow(new IllegalStateException("abc")).when(subscriber).awaitTerminated(1, TimeUnit.MINUTES);
doThrow(new IllegalStateException("def")).when(backlogReader).close();
- doThrow(new IllegalStateException("ghi")).when(committer).close();
assertThrows(IOException.class, reader::close);
verify(subscriber).stopAsync();
verify(subscriber).awaitTerminated(1, TimeUnit.MINUTES);
- verify(committer).close();
verify(backlogReader).close();
}
}