You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/22 09:25:56 UTC

[GitHub] [beam] je-ik commented on a change in pull request #13592: [BEAM-11403] Cache UnboundedReader per CheckpointMark in SDF Wrapper DoFn.

je-ik commented on a change in pull request #13592:
URL: https://github.com/apache/beam/pull/13592#discussion_r547153671



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -488,7 +519,20 @@ public void splitRestriction(
         restrictionTracker(
             @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction,
             PipelineOptions pipelineOptions) {
-      return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions);
+      try {
+        UnboundedReader reader =
+            cachedReaders.getIfPresent(
+                createCacheKey(restriction.getSource(), restriction.getCheckpoint()));
+        if (reader == null) {
+          reader =
+              restriction.getSource().createReader(pipelineOptions, restriction.getCheckpoint());
+          cachedReaders.put(
+              createCacheKey(restriction.getSource(), restriction.getCheckpoint()), reader);
+        }
+        return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions, reader);

Review comment:
       ```suggestion
           return new UnboundedSourceAsSDFRestrictionTracker<>(restriction, pipelineOptions, reader);
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -439,11 +444,37 @@ public IsBounded isBounded() {
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
     private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
     private final Coder<CheckpointT> checkpointCoder;
+    private Cache<UnboundedSourceRestriction, UnboundedReader> cachedReaders;

Review comment:
       ```suggestion
       private Cache<UnboundedSourceRestriction<OutputT, CheckointT>, UnboundedReader> cachedReaders;
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -756,23 +810,20 @@ public CheckpointMark getCheckpointMark() {
       private final PipelineOptions pipelineOptions;
       private UnboundedSource.UnboundedReader<OutputT> currentReader;
       private boolean readerHasBeenStarted;
+      private boolean isDone;

Review comment:
       :+1:

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -439,11 +444,37 @@ public IsBounded isBounded() {
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
     private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
     private final Coder<CheckpointT> checkpointCoder;
+    private Cache<UnboundedSourceRestriction, UnboundedReader> cachedReaders;
 
     private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> checkpointCoder) {
       this.checkpointCoder = checkpointCoder;
     }
 
+    private UnboundedSourceRestriction createCacheKey(
+        UnboundedSource<OutputT, CheckpointT> source, CheckpointT checkpoint) {
+      // For caching reader, we don't care about the watermark.
+      return UnboundedSourceRestriction.create(
+          source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+      cachedReaders =
+          CacheBuilder.newBuilder()
+              .expireAfterWrite(5, TimeUnit.MINUTES)

Review comment:
       I think this could be way lower. We insert the reader back to the cache after checkpoint, so if it is not reused after few seconds, it is likely not be reused at all. Maybe 10 seconds might be enough?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -756,23 +810,20 @@ public CheckpointMark getCheckpointMark() {
       private final PipelineOptions pipelineOptions;
       private UnboundedSource.UnboundedReader<OutputT> currentReader;
       private boolean readerHasBeenStarted;
+      private boolean isDone;
 
       UnboundedSourceAsSDFRestrictionTracker(
           UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction,
-          PipelineOptions pipelineOptions) {
+          PipelineOptions pipelineOptions,
+          UnboundedReader cachedUnboundedReader) {

Review comment:
       ```suggestion
             UnboundedReader<OutputT> cachedUnboundedReader) {
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -501,6 +545,10 @@ public ProcessContinuation processElement(
         throws IOException {
       UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction =
           tracker.currentRestriction();
+      UnboundedReader currentReader =

Review comment:
       ```suggestion
         UnboundedReader<OutputT> currentReader =
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -527,6 +575,12 @@ public ProcessContinuation processElement(
             currentRestriction.getCheckpoint()::finalizeCheckpoint);
       }
 
+      // Update the readers cache with latest CheckpointMark.
+      cachedReaders.put(
+          createCacheKey(
+              currentReader.getCurrentSource(), (CheckpointT) currentReader.getCheckpointMark()),

Review comment:
       ```suggestion
                 (UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(),
                 (CheckpointT) currentReader.getCheckpointMark()),
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -439,11 +444,37 @@ public IsBounded isBounded() {
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
     private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
     private final Coder<CheckpointT> checkpointCoder;
+    private Cache<UnboundedSourceRestriction, UnboundedReader> cachedReaders;
 
     private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> checkpointCoder) {
       this.checkpointCoder = checkpointCoder;
     }
 
+    private UnboundedSourceRestriction createCacheKey(
+        UnboundedSource<OutputT, CheckpointT> source, CheckpointT checkpoint) {
+      // For caching reader, we don't care about the watermark.
+      return UnboundedSourceRestriction.create(
+          source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+      cachedReaders =
+          CacheBuilder.newBuilder()
+              .expireAfterWrite(5, TimeUnit.MINUTES)
+              .maximumSize(100)
+              .removalListener(
+                  (RemovalListener<UnboundedSourceRestriction, UnboundedReader>)
+                      removalNotification -> {
+                        try {
+                          removalNotification.getValue().close();

Review comment:
       This looks dangerous. What if we are still using the reader inside `processElement`? This is likely to be called from different thread, so we need a synchronization there and not close the reader if being used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org