You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "liming30 (via GitHub)" <gi...@apache.org> on 2023/11/13 08:41:42 UTC

[PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

liming30 opened a new pull request, #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306

   
   
   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #2304 
   
   <!-- What is the purpose of the change -->
   [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink.
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   Existing tests can cover this change.
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   No
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#issuecomment-1807699026

   Thanks @liming30 , left two comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#issuecomment-1809424183

   @liming30 It is OK to merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "liming30 (via GitHub)" <gi...@apache.org>.
liming30 commented on PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#issuecomment-1808202855

   > In #2304 ,
   > 
   > ```
   > Minimal reproduce step
   > Run BatchFileStoreITCase#testOverwriteEmpty in paimon-flink-1.14 and paimon-flink-1.15.
   > ```
   > 
   > What do you mean?
   
   My mistake. In #2221, I introduced `ReaderConsumeProgressEvent`.  `StaticFileStoreSplitEnumerator` will go into the `handleSourceEvent` method after split is consumed. At this time,  `java.lang.ClassNotFoundException: org.apache.flink.table.connector.source.DynamicFilteringEvent` will appear in flink-1.14 and flink-1.15.
   
   Do you think the changes here should be moved to #2221?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "liming30 (via GitHub)" <gi...@apache.org>.
liming30 commented on code in PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#discussion_r1390983576


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -65,59 +49,10 @@ public StaticFileStoreSplitEnumerator(
             @Nullable Snapshot snapshot,
             SplitAssigner splitAssigner,
             @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) {
-        this.context = context;
-        this.snapshot = snapshot;
-        this.splitAssigner = splitAssigner;
+        super(context, snapshot, splitAssigner);
         this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo;
     }
 
-    @Override
-    public void start() {
-        // no resources to start
-    }
-
-    @Override
-    public void handleSplitRequest(int subtask, @Nullable String hostname) {
-        if (!context.registeredReaders().containsKey(subtask)) {
-            // reader failed between sending the request and now. skip this request.
-            return;
-        }
-
-        List<FileStoreSourceSplit> assignment = splitAssigner.getNext(subtask, hostname);
-        if (assignment.size() > 0) {
-            context.assignSplits(
-                    new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
-        } else {
-            context.signalNoMoreSplits(subtask);
-        }
-    }
-
-    @Override
-    public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
-        splitAssigner.addSplitsBack(subtaskId, backSplits);
-    }
-
-    @Override
-    public void addReader(int subtaskId) {
-        // this source is purely lazy-pull-based, nothing to do upon registration
-    }
-
-    @Override
-    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
-        return new PendingSplitsCheckpoint(
-                splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id());
-    }
-
-    @Override
-    public void close() {
-        // no resources to close
-    }
-
-    @Nullable
-    public Snapshot snapshot() {
-        return snapshot;
-    }
-
     @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
         if (sourceEvent instanceof DynamicFilteringEvent) {

Review Comment:
   Sounds great, I have addressed it according to the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "liming30 (via GitHub)" <gi...@apache.org>.
liming30 commented on PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#issuecomment-1807969774

   > Can you add `BatchFileStoreITCase` to 1.14 and 1.15?
   
   `BatchFileStoreITCase` already exists in 1.14 and 1.15.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#discussion_r1390775446


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -65,59 +49,10 @@ public StaticFileStoreSplitEnumerator(
             @Nullable Snapshot snapshot,
             SplitAssigner splitAssigner,
             @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) {
-        this.context = context;
-        this.snapshot = snapshot;
-        this.splitAssigner = splitAssigner;
+        super(context, snapshot, splitAssigner);
         this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo;
     }
 
-    @Override
-    public void start() {
-        // no resources to start
-    }
-
-    @Override
-    public void handleSplitRequest(int subtask, @Nullable String hostname) {
-        if (!context.registeredReaders().containsKey(subtask)) {
-            // reader failed between sending the request and now. skip this request.
-            return;
-        }
-
-        List<FileStoreSourceSplit> assignment = splitAssigner.getNext(subtask, hostname);
-        if (assignment.size() > 0) {
-            context.assignSplits(
-                    new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
-        } else {
-            context.signalNoMoreSplits(subtask);
-        }
-    }
-
-    @Override
-    public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
-        splitAssigner.addSplitsBack(subtaskId, backSplits);
-    }
-
-    @Override
-    public void addReader(int subtaskId) {
-        // this source is purely lazy-pull-based, nothing to do upon registration
-    }
-
-    @Override
-    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
-        return new PendingSplitsCheckpoint(
-                splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id());
-    }
-
-    @Override
-    public void close() {
-        // no resources to close
-    }
-
-    @Nullable
-    public Snapshot snapshot() {
-        return snapshot;
-    }
-
     @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
         if (sourceEvent instanceof DynamicFilteringEvent) {

Review Comment:
   Can we modify this to `if (sourceEvent != null && sourceEvent.getClass().getName().endsWith("DynamicFilteringEvent"))`?
   
   And extract DynamicFiltering logical from here to a separate class?
   
   In this way, we can avoid to introduce class to 1.14 and 1.15. (Small reflection is better than multiple copies)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#issuecomment-1808156376

   In #2304 , 
   ```
   Minimal reproduce step
   Run BatchFileStoreITCase#testOverwriteEmpty in paimon-flink-1.14 and paimon-flink-1.15.
   ```
   What do you mean?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] fix the problem that DynamicFilteringEvent does not exist in version 1.16- of flink. [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2306:
URL: https://github.com/apache/incubator-paimon/pull/2306#discussion_r1390775446


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -65,59 +49,10 @@ public StaticFileStoreSplitEnumerator(
             @Nullable Snapshot snapshot,
             SplitAssigner splitAssigner,
             @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) {
-        this.context = context;
-        this.snapshot = snapshot;
-        this.splitAssigner = splitAssigner;
+        super(context, snapshot, splitAssigner);
         this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo;
     }
 
-    @Override
-    public void start() {
-        // no resources to start
-    }
-
-    @Override
-    public void handleSplitRequest(int subtask, @Nullable String hostname) {
-        if (!context.registeredReaders().containsKey(subtask)) {
-            // reader failed between sending the request and now. skip this request.
-            return;
-        }
-
-        List<FileStoreSourceSplit> assignment = splitAssigner.getNext(subtask, hostname);
-        if (assignment.size() > 0) {
-            context.assignSplits(
-                    new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
-        } else {
-            context.signalNoMoreSplits(subtask);
-        }
-    }
-
-    @Override
-    public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
-        splitAssigner.addSplitsBack(subtaskId, backSplits);
-    }
-
-    @Override
-    public void addReader(int subtaskId) {
-        // this source is purely lazy-pull-based, nothing to do upon registration
-    }
-
-    @Override
-    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
-        return new PendingSplitsCheckpoint(
-                splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id());
-    }
-
-    @Override
-    public void close() {
-        // no resources to close
-    }
-
-    @Nullable
-    public Snapshot snapshot() {
-        return snapshot;
-    }
-
     @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
         if (sourceEvent instanceof DynamicFilteringEvent) {

Review Comment:
   Can we modify this to `if (sourceEvent != null && sourceEvent.getClass().getName().endsWith("DynamicFilteringEvent"))`?
   
   And extract DynamicFiltering logical from here to a separate class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org