You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/22 18:36:48 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #13592: [BEAM-11403] Cache UnboundedReader per CheckpointMark in SDF Wrapper DoFn.

boyuanzz commented on a change in pull request #13592:
URL: https://github.com/apache/beam/pull/13592#discussion_r547440367



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -439,11 +444,37 @@ public IsBounded isBounded() {
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
     private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
     private final Coder<CheckpointT> checkpointCoder;
+    private Cache<UnboundedSourceRestriction, UnboundedReader> cachedReaders;
 
     private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> checkpointCoder) {
       this.checkpointCoder = checkpointCoder;
     }
 
+    private UnboundedSourceRestriction createCacheKey(
+        UnboundedSource<OutputT, CheckpointT> source, CheckpointT checkpoint) {
+      // For caching reader, we don't care about the watermark.
+      return UnboundedSourceRestriction.create(
+          source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+      cachedReaders =
+          CacheBuilder.newBuilder()
+              .expireAfterWrite(5, TimeUnit.MINUTES)
+              .maximumSize(100)
+              .removalListener(
+                  (RemovalListener<UnboundedSourceRestriction, UnboundedReader>)
+                      removalNotification -> {
+                        try {
+                          removalNotification.getValue().close();

Review comment:
       That's a very good point. Thanks for catching this! I should remove the entry from cache when I obtain the reader from the cache to avoid eviction during processing and we will cache the reader after we finish.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org