You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "wxplovecc (via GitHub)" <gi...@apache.org> on 2023/03/22 09:48:29 UTC

[GitHub] [incubator-paimon] wxplovecc opened a new pull request, #687: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator

wxplovecc opened a new pull request, #687:
URL: https://github.com/apache/incubator-paimon/pull/687

   *(Please specify the module before the PR name: [core] ... or [flink] ...)*
   
   ### Purpose
   
   Assign all splits in one time will produce exceed akka.framesize to exceptions
   
   ### Tests
   
   StaticFileStoreSplitEnumeratorTest.testSplitBatch
   
   ### API and Format 
   
   *(Does this change affect API or storage format)*
   
   ### Documentation
   
   *(Does this change introduce a new feature)*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi merged pull request #687: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #687:
URL: https://github.com/apache/incubator-paimon/pull/687


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #687: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #687:
URL: https://github.com/apache/incubator-paimon/pull/687#discussion_r1144649493


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -79,10 +82,15 @@ public void handleSplitRequest(int subtask, @Nullable String hostname) {
         // The following batch assignment operation is for two purposes:
         // To distribute splits evenly when batch reading to prevent a few tasks from reading all
         // the data (for example, the current resource can only schedule part of the tasks).
-        // TODO: assignment is already created in constructor, here can just assign per batch
-        List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);

Review Comment:
   Can we make `pendingSplitAssignment` to `Map<Integer, Queue<FileStoreSourceSplit>>`?
   And poll a batch from it.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -37,6 +37,9 @@
 public class StaticFileStoreSplitEnumerator
         implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
 
+    /** Default batch splits size to avoid exceed `akka.framesize`. */
+    private static final int DEFAULT_SPLITS_SIZE = 10;

Review Comment:
   Can we introduce an option for this? Can be an option in `FlinkConnectorOptions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] wxplovecc commented on a diff in pull request #687: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator

Posted by "wxplovecc (via GitHub)" <gi...@apache.org>.
wxplovecc commented on code in PR #687:
URL: https://github.com/apache/incubator-paimon/pull/687#discussion_r1145595987


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -79,10 +82,15 @@ public void handleSplitRequest(int subtask, @Nullable String hostname) {
         // The following batch assignment operation is for two purposes:
         // To distribute splits evenly when batch reading to prevent a few tasks from reading all
         // the data (for example, the current resource can only schedule part of the tasks).
-        // TODO: assignment is already created in constructor, here can just assign per batch
-        List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #687: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #687:
URL: https://github.com/apache/incubator-paimon/pull/687#discussion_r1145939840


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java:
##########
@@ -94,6 +98,6 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
         }
 
         Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
-        return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
+        return new StaticFileStoreSplitEnumerator(context, snapshot, splits, splitsSize);

Review Comment:
   Here, we can get `splitBatchSize` from table. (via table.options())



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -79,10 +86,16 @@ public void handleSplitRequest(int subtask, @Nullable String hostname) {
         // The following batch assignment operation is for two purposes:
         // To distribute splits evenly when batch reading to prevent a few tasks from reading all
         // the data (for example, the current resource can only schedule part of the tasks).
-        // TODO: assignment is already created in constructor, here can just assign per batch
-        List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);
-        if (splits != null && splits.size() > 0) {
-            context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
+        Queue<FileStoreSourceSplit> allSubTaskSplits = pendingSplitAssignment.get(subtask);

Review Comment:
   minor: rename to `taskSplits`



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -79,10 +86,16 @@ public void handleSplitRequest(int subtask, @Nullable String hostname) {
         // The following batch assignment operation is for two purposes:
         // To distribute splits evenly when batch reading to prevent a few tasks from reading all
         // the data (for example, the current resource can only schedule part of the tasks).
-        // TODO: assignment is already created in constructor, here can just assign per batch
-        List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask);
-        if (splits != null && splits.size() > 0) {
-            context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
+        Queue<FileStoreSourceSplit> allSubTaskSplits = pendingSplitAssignment.get(subtask);
+        List<FileStoreSourceSplit> batchAssignment = new ArrayList<>();

Review Comment:
   minor: rename to `assignment`?



##########
docs/layouts/shortcodes/generated/flink_connector_configuration.html:
##########
@@ -26,6 +26,12 @@
             <td>String</td>
             <td>The log system used to keep changes of the table.<br /><br />Possible values:<br /><ul><li>"none": No log system, the data is written only to file store, and the streaming read will be directly read from the file store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written to file store and kafka, and the streaming read will be read from kafka.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>scan.bounded.splits.size</h5></td>

Review Comment:
   `scan.split-enumerator.batch-size`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] wxplovecc commented on a diff in pull request #687: [flink] Assign splits with fixed batch size in StaticFileStoreSplitEnumerator

Posted by "wxplovecc (via GitHub)" <gi...@apache.org>.
wxplovecc commented on code in PR #687:
URL: https://github.com/apache/incubator-paimon/pull/687#discussion_r1146029739


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java:
##########
@@ -94,6 +98,6 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
         }
 
         Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
-        return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
+        return new StaticFileStoreSplitEnumerator(context, snapshot, splits, splitsSize);

Review Comment:
   should we move the option to CoreOptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org