You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "ptupitsyn (via GitHub)" <gi...@apache.org> on 2023/06/19 13:08:22 UTC

[GitHub] [ignite-3] ptupitsyn opened a new pull request, #2215: IGNITE-19617 Embedded basic Data Streamer

ptupitsyn opened a new pull request, #2215:
URL: https://github.com/apache/ignite-3/pull/2215

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ptupitsyn commented on a diff in pull request #2215: IGNITE-19617 Embedded basic Data Streamer

Posted by "ptupitsyn (via GitHub)" <gi...@apache.org>.
ptupitsyn commented on code in PR #2215:
URL: https://github.com/apache/ignite-3/pull/2215#discussion_r1236880669


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -648,18 +648,26 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
         return enlistInTx(
                 rows,
                 tx,
-                (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
-                        .groupId(groupId)
-                        .commitPartitionId(commitPart)
-                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
-                        .transactionId(txo.id())
-                        .term(term)
-                        .requestType(RequestType.RW_UPSERT_ALL)
-                        .timestampLong(clock.nowLong())
-                        .build(),
+                this::upsertAllInternal,
                 CompletableFuture::allOf);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int partition) {
+        InternalTransaction tx = txManager.begin();
+        TablePartitionId partGroupId = new TablePartitionId(tableId, partition);
+
+        CompletableFuture<Void> fut = enlistWithRetry(
+                tx,
+                partition,
+                (commitPart, term) -> upsertAllInternal(commitPart, rows, tx, partGroupId, term),
+                ATTEMPTS_TO_ENLIST_PARTITION
+        );

Review Comment:
   There is no strong guarantee on the client. But embedded mode is deterministic:
   * We batch the data by partition, not by node
   * Out-of-date partition assignment is not a concern
   * Crashing nodes are not a concern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] isapego commented on a diff in pull request #2215: IGNITE-19617 Embedded basic Data Streamer

Posted by "isapego (via GitHub)" <gi...@apache.org>.
isapego commented on code in PR #2215:
URL: https://github.com/apache/ignite-3/pull/2215#discussion_r1236832053


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -648,18 +648,26 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
         return enlistInTx(
                 rows,
                 tx,
-                (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
-                        .groupId(groupId)
-                        .commitPartitionId(commitPart)
-                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
-                        .transactionId(txo.id())
-                        .term(term)
-                        .requestType(RequestType.RW_UPSERT_ALL)
-                        .timestampLong(clock.nowLong())
-                        .build(),
+                this::upsertAllInternal,
                 CompletableFuture::allOf);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int partition) {
+        InternalTransaction tx = txManager.begin();
+        TablePartitionId partGroupId = new TablePartitionId(tableId, partition);
+
+        CompletableFuture<Void> fut = enlistWithRetry(
+                tx,
+                partition,
+                (commitPart, term) -> upsertAllInternal(commitPart, rows, tx, partGroupId, term),
+                ATTEMPTS_TO_ENLIST_PARTITION
+        );

Review Comment:
   What is going to happen, if we provide non-matching rows and partition? AFAIR our streamer gives no guarantees for the right partition, only best effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ptupitsyn commented on a diff in pull request #2215: IGNITE-19617 Embedded basic Data Streamer

Posted by "ptupitsyn (via GitHub)" <gi...@apache.org>.
ptupitsyn commented on code in PR #2215:
URL: https://github.com/apache/ignite-3/pull/2215#discussion_r1236882310


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -648,18 +648,26 @@ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalT
         return enlistInTx(
                 rows,
                 tx,
-                (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
-                        .groupId(groupId)
-                        .commitPartitionId(commitPart)
-                        .binaryRowsBytes(serializeBinaryRows(keyRows0))
-                        .transactionId(txo.id())
-                        .term(term)
-                        .requestType(RequestType.RW_UPSERT_ALL)
-                        .timestampLong(clock.nowLong())
-                        .build(),
+                this::upsertAllInternal,
                 CompletableFuture::allOf);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int partition) {
+        InternalTransaction tx = txManager.begin();
+        TablePartitionId partGroupId = new TablePartitionId(tableId, partition);
+
+        CompletableFuture<Void> fut = enlistWithRetry(
+                tx,
+                partition,
+                (commitPart, term) -> upsertAllInternal(commitPart, rows, tx, partGroupId, term),
+                ATTEMPTS_TO_ENLIST_PARTITION
+        );

Review Comment:
   We use the same logic as `upsertAll` here, only the batching part is in a different place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ptupitsyn merged pull request #2215: IGNITE-19617 Embedded basic Data Streamer

Posted by "ptupitsyn (via GitHub)" <gi...@apache.org>.
ptupitsyn merged PR #2215:
URL: https://github.com/apache/ignite-3/pull/2215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org