You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/06/22 09:26:17 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

tkalkirill opened a new pull request, #2236:
URL: https://github.com/apache/ignite-3/pull/2236

   https://issues.apache.org/jira/browse/I[IGNITE-16004


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239517816


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -646,7 +647,7 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
                 rows,
                 tx,
                 this::upsertAllInternal,
-                InternalTableImpl::allOfResultFuture
+                RowBatch::allResultFuture

Review Comment:
   It's either `allResultsFuture`, or `allResultFutures`, one of them must be plural



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239502262


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);

Review Comment:
   fix it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -283,7 +284,7 @@ private <T> CompletableFuture<T> enlistInTx(
             Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx,
             IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>, InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
-            Function<CompletableFuture<Object>[], CompletableFuture<T>> reducer
+            Function<Int2ObjectMap<RowBatch>, CompletableFuture<T>> reducer

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239508269


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();
+
+                    var result = new BinaryRow[totalRowSize];
+
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        RowBatch rowBatch = partitionRowBatch.getValue();
+
+                        assert batchResult.size() == rowBatch.rows.size() :
+                                "batchResult=" + batchResult.size() + ", rowBatch=" + rowBatch.rows.size();
+
+                        int i = 0;
+
+                        for (BinaryRow resultRow : batchResult) {
+                            result[rowBatch.originalRowOrder.getInt(i++)] = resultRow;

Review Comment:
   fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239487027


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1077,19 +1078,20 @@ private void validatePartitionIndex(int p) {
     }
 
     /**
-     * Map rows to partitions.
+     * Creates batches of rows for processing, separated by partition ID.

Review Comment:
   fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill merged PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239494792


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();
+
+                    var result = new BinaryRow[totalRowSize];
+
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        RowBatch rowBatch = partitionRowBatch.getValue();
+
+                        assert batchResult.size() == rowBatch.rows.size() :
+                                "batchResult=" + batchResult.size() + ", rowBatch=" + rowBatch.rows.size();
+
+                        int i = 0;
+
+                        for (BinaryRow resultRow : batchResult) {
+                            result[rowBatch.originalRowOrder.getInt(i++)] = resultRow;
+                        }
+                    }
+
+                    // Use Arrays#asList to avoid copying the array.
+                    return Arrays.asList(result);
+                });
+    }
+
+    private static CompletableFuture<Void> allOfResultFuture(Int2ObjectMap<RowBatch> rowBatchByPartitionId) {
+        return CompletableFuture.allOf(
+                rowBatchByPartitionId.values().stream().map(rowBatch -> rowBatch.resultFuture).toArray(CompletableFuture[]::new)
+        );
+    }
+
+    private static Collection<BinaryRow> getBatchProcessingResult(Int2ObjectMap.Entry<RowBatch> partitionRowBatch) {
+        RowBatch rowBatch = partitionRowBatch.getValue();
+
+        assert rowBatch.resultFuture != null : "partitionId=" + partitionRowBatch.getIntKey();
+
+        // It is safe to call join(), we have already waited for all the resultFutures.
+        return (Collection<BinaryRow>) rowBatch.resultFuture.join();

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239381529


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -649,7 +646,8 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
                 rows,
                 tx,
                 this::upsertAllInternal,
-                CompletableFuture::allOf);
+                InternalTableImpl::allOfResultFuture

Review Comment:
   `allResultFutures` would look more natural, when it comes to naming, but it's up to you to decide. I prefer grammatically correct names when it's possible



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -283,7 +284,7 @@ private <T> CompletableFuture<T> enlistInTx(
             Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx,
             IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>, InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
-            Function<CompletableFuture<Object>[], CompletableFuture<T>> reducer
+            Function<Int2ObjectMap<RowBatch>, CompletableFuture<T>> reducer

Review Comment:
   I believe that parameter of this function doesn't need partition IDs, passing `RowBatch[]` or `Collection<RowBatch>` would be enough. Could you please change the signature?
   If I'm missing something, then please correct me.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();
+
+                    var result = new BinaryRow[totalRowSize];
+
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        RowBatch rowBatch = partitionRowBatch.getValue();
+
+                        assert batchResult.size() == rowBatch.rows.size() :
+                                "batchResult=" + batchResult.size() + ", rowBatch=" + rowBatch.rows.size();
+
+                        int i = 0;
+
+                        for (BinaryRow resultRow : batchResult) {
+                            result[rowBatch.originalRowOrder.getInt(i++)] = resultRow;
+                        }
+                    }
+
+                    // Use Arrays#asList to avoid copying the array.
+                    return Arrays.asList(result);
+                });
+    }
+
+    private static CompletableFuture<Void> allOfResultFuture(Int2ObjectMap<RowBatch> rowBatchByPartitionId) {
+        return CompletableFuture.allOf(
+                rowBatchByPartitionId.values().stream().map(rowBatch -> rowBatch.resultFuture).toArray(CompletableFuture[]::new)
+        );
+    }
+
+    private static Collection<BinaryRow> getBatchProcessingResult(Int2ObjectMap.Entry<RowBatch> partitionRowBatch) {
+        RowBatch rowBatch = partitionRowBatch.getValue();
+
+        assert rowBatch.resultFuture != null : "partitionId=" + partitionRowBatch.getIntKey();
+
+        // It is safe to call join(), we have already waited for all the resultFutures.
+        return (Collection<BinaryRow>) rowBatch.resultFuture.join();

Review Comment:
   There's no assertion that the future is completed. Comment is not enough.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();
+
+                    var result = new BinaryRow[totalRowSize];
+
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        RowBatch rowBatch = partitionRowBatch.getValue();
+
+                        assert batchResult.size() == rowBatch.rows.size() :
+                                "batchResult=" + batchResult.size() + ", rowBatch=" + rowBatch.rows.size();
+
+                        int i = 0;
+
+                        for (BinaryRow resultRow : batchResult) {
+                            result[rowBatch.originalRowOrder.getInt(i++)] = resultRow;

Review Comment:
   Name `getInt` says nothing about what int it returns, please rename it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1077,19 +1078,20 @@ private void validatePartitionIndex(int p) {
     }
 
     /**
-     * Map rows to partitions.
+     * Creates batches of rows for processing, separated by partition ID.

Review Comment:
   ```suggestion
        * Creates batches of rows for processing, grouped by partition ID.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();

Review Comment:
   Why do you use streams? It's on the user's hot path, right? I think there was an agreement to only use streams in "rare" operations



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);

Review Comment:
   What's the reason of passing entry into this method, instead of `RowBatch`? Does this method require partition ID?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();
+
+                    var result = new BinaryRow[totalRowSize];
+
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        RowBatch rowBatch = partitionRowBatch.getValue();
+
+                        assert batchResult.size() == rowBatch.rows.size() :

Review Comment:
   Maybe `rows` should be renamed to `requestedRows`, otherwise the code is hard to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239520522


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -646,7 +647,7 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
                 rows,
                 tx,
                 this::upsertAllInternal,
-                InternalTableImpl::allOfResultFuture
+                RowBatch::allResultFuture

Review Comment:
   Shit, my attentiveness is suffering!
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239486721


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -649,7 +646,8 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
                 rows,
                 tx,
                 this::upsertAllInternal,
-                CompletableFuture::allOf);
+                InternalTableImpl::allOfResultFuture

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2236: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2236:
URL: https://github.com/apache/ignite-3/pull/2236#discussion_r1239494442


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();

Review Comment:
   Fix it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1206,28 +1208,87 @@ public int partitionId(BinaryRowEx row) {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatchByPartitionId.size());
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
 
-                        if (values != null) {
-                            list.addAll(values);
+                        if (batchResult == null) {
+                            continue;
                         }
+
+                        result.addAll(batchResult);
                     }
 
-                    return list;
+                    return result;
                 });
     }
 
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatchByPartitionId Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(
+            Int2ObjectMap<RowBatch> rowBatchByPartitionId
+    ) {
+        return allOfResultFuture(rowBatchByPartitionId)
+                .thenApply(response -> {
+                    int totalRowSize = rowBatchByPartitionId.values().stream().mapToInt(value -> value.rows.size()).sum();
+
+                    var result = new BinaryRow[totalRowSize];
+
+                    for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+                        Collection<BinaryRow> batchResult = getBatchProcessingResult(partitionRowBatch);
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        RowBatch rowBatch = partitionRowBatch.getValue();
+
+                        assert batchResult.size() == rowBatch.rows.size() :

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org