You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/22 13:18:47 UTC

[GitHub] [ignite-3] korlov42 commented on a change in pull request #348: IGNITE-15434

korlov42 commented on a change in pull request #348:
URL: https://github.com/apache/ignite-3/pull/348#discussion_r713795918



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -195,5 +196,13 @@
     CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows,
         @Nullable Transaction tx);
 
-    //TODO: IGNTIE-14488. Add invoke() methods.
+    /**
+     * Scans given partition, providing {@link Publisher<BinaryRow>} that reflectively notifies about partition rows.
+     * @param p The partition.
+     * @param tx The transaction.
+     * @return {@link Publisher<BinaryRow>} t{@link Publisher<BinaryRow>} that reflectively notifies about partition rows.
+     */
+    @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);

Review comment:
       ```suggestion
       /**
        * Scans given partition, providing {@link Publisher<BinaryRow>} that reflectively notifies about partition rows.
        *
        * @param p The partition.
        * @param tx The transaction.
        * @return {@link Publisher<BinaryRow>} that reflectively notifies about partition rows.
        */
       @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);
   ```

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        Cursor<DataRow> cursor = storage.scan(key -> true);
+
+        cursors.put(
+            cursorId,
+            new CursorMeta(
+                cursor,
+                rangeCmd.requesterNodeId()
+            )
+        );
+
+        clo.result(null);
+    }
+
+    /**
+     * Handler for the {@link ScanRetrieveBatchCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(new NoSuchElementException("Corresponding cursor on server side not found."));
+
+            return;
+        }
+
+        List<BinaryRow> res = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < clo.command().itemsToRetrieveCount() && cursorDesc.cursor().hasNext(); i++)
+                res.add(new ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+        }
+        catch (Exception e) {
+            clo.result(e);
+        }
+
+        clo.result(new MultiRowsResponse(res));
+    }
+
+    /**
+     * Handler for the {@link ScanCloseCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());

Review comment:
       Probably, we need to remove cursor at this point, not just get it from the map

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        Cursor<DataRow> cursor = storage.scan(key -> true);
+
+        cursors.put(
+            cursorId,
+            new CursorMeta(
+                cursor,
+                rangeCmd.requesterNodeId()
+            )
+        );
+
+        clo.result(null);
+    }
+
+    /**
+     * Handler for the {@link ScanRetrieveBatchCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(new NoSuchElementException("Corresponding cursor on server side not found."));
+
+            return;
+        }
+
+        List<BinaryRow> res = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < clo.command().itemsToRetrieveCount() && cursorDesc.cursor().hasNext(); i++)

Review comment:
       Type of `ScanRetrieveBatchCommand#itemsToRetrieveCount` is long, thus if someone will request `Interger.MAX_VALUE + 1` rows, it will read the cursor till the very end because the integer counter will overflow.
   
   Another problem here, I doubt ArrayList could handle more rows than `Integer.MAX_VALUE` (assuming the heap size is not questioned here).
   
   With that said, I propose to change the type of `ScanRetrieveBatchCommand#itemsToRetrieveCount` on int.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -494,4 +587,42 @@ private void handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
     public Storage getStorage() {
         return storage;
     }
+
+    /**
+     * Cursor meta information: origin node id and type.
+     */
+    private class CursorMeta {
+        /** Cursor. */
+        private final Cursor<DataRow> cursor;
+
+        /** Id of the node that creates cursor. */
+        private final String requesterNodeId;
+
+        /**
+         * The constructor.
+         *
+         * @param cursor Cursor.
+         * @param requesterNodeId Id of the node that creates cursor.
+         */
+        CursorMeta(Cursor<DataRow> cursor,

Review comment:
       ```suggestion
           CursorMeta(
               Cursor<DataRow> cursor,
   ```

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Parition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** List of subscriptions. */
+        private final List<PartitionScanSubscription> subscriptions;
+
+        /** {@link Publisher<BinaryRow>} that relatively notifies about partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively notifies about partition rows.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.subscriptions = Collections.synchronizedList(new ArrayList<>());
+            this.raftGrpSvc = raftGrpSvc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> subscriber) {
+            PartitionScanSubscription subscription = new PartitionScanSubscription(subscriber);
+
+            subscriptions.add(subscription);

Review comment:
       for the first iteration I would prefer to introduce `one subscriber per publisher` restriction. Otherwise what should we do in the case of unbalanced consumption? Assume one subscriber requests only one row and another requests 1000 rows. Should we maintain a pending queue for the first subscriber? Currently we push the row to all subscribers regardless the actual amount of requested rows and I doubt it's desired behaviour. 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        Cursor<DataRow> cursor = storage.scan(key -> true);

Review comment:
       The `scan` method actually throws exceptions, thus let's wrap this invocation with `try-catch` add appropriate test to verify everything is OK 

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -417,6 +436,80 @@ private void handleGetAndUpsertCommand(CommandClosure<GetAndUpsertCommand> clo)
         clo.result(new SingleRowResponse(response));
     }
 
+    /**
+     * Handler for the {@link ScanInitCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanInitCommand(CommandClosure<ScanInitCommand> clo) {
+        ScanInitCommand rangeCmd = clo.command();
+
+        IgniteUuid cursorId = rangeCmd.scanId();
+
+        Cursor<DataRow> cursor = storage.scan(key -> true);
+
+        cursors.put(
+            cursorId,
+            new CursorMeta(
+                cursor,
+                rangeCmd.requesterNodeId()
+            )
+        );
+
+        clo.result(null);
+    }
+
+    /**
+     * Handler for the {@link ScanRetrieveBatchCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanRetrieveBatchCommand(CommandClosure<ScanRetrieveBatchCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(new NoSuchElementException("Corresponding cursor on server side not found."));
+
+            return;
+        }
+
+        List<BinaryRow> res = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < clo.command().itemsToRetrieveCount() && cursorDesc.cursor().hasNext(); i++)
+                res.add(new ByteBufferRow(cursorDesc.cursor().next().valueBytes()));
+        }
+        catch (Exception e) {
+            clo.result(e);
+        }
+
+        clo.result(new MultiRowsResponse(res));
+    }
+
+    /**
+     * Handler for the {@link ScanCloseCommand}.
+     *
+     * @param clo Command closure.
+     */
+    private void handleScanCloseCommand(CommandClosure<ScanCloseCommand> clo) {
+        CursorMeta cursorDesc = cursors.get(clo.command().scanId());
+
+        if (cursorDesc == null) {
+            clo.result(null);
+
+            return;
+        }
+
+        try {
+            cursorDesc.cursor().close();
+        }
+        catch (Exception e) {
+            throw new IgniteInternalException(e);

Review comment:
       should we complete the command closure with this exception?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Parition scan publisher. */

Review comment:
       misspelled `partition`

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Parition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** List of subscriptions. */
+        private final List<PartitionScanSubscription> subscriptions;
+
+        /** {@link Publisher<BinaryRow>} that relatively notifies about partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively notifies about partition rows.

Review comment:
       wrong param description

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -310,4 +344,117 @@ private int partId(BinaryRow row) {
                     return list;
                 });
     }
+
+    /** Parition scan publisher. */
+    private class PartitionScanPublisher implements Publisher<BinaryRow> {
+        /** List of subscriptions. */
+        private final List<PartitionScanSubscription> subscriptions;
+
+        /** {@link Publisher<BinaryRow>} that relatively notifies about partition rows.  */
+        private final RaftGroupService raftGrpSvc;
+
+        /**
+         * The constructor.
+         *
+         * @param raftGrpSvc {@link Publisher<BinaryRow>} that relatively notifies about partition rows.
+         */
+        PartitionScanPublisher(RaftGroupService raftGrpSvc) {
+            this.subscriptions = Collections.synchronizedList(new ArrayList<>());
+            this.raftGrpSvc = raftGrpSvc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void subscribe(Subscriber<? super BinaryRow> subscriber) {
+            PartitionScanSubscription subscription = new PartitionScanSubscription(subscriber);
+
+            subscriptions.add(subscription);
+
+            subscriber.onSubscribe(subscription);
+        }
+
+        /**
+         * Partition Scan Subscription.
+         */
+        private class PartitionScanSubscription implements Subscription {
+            /** */
+            private final Subscriber<? super BinaryRow> subscriber;
+
+            /** */
+            private final AtomicBoolean isCanceled;
+
+            /** Scan id to uniquely identify it on server side. */
+            private final IgniteUuid scanId;
+
+            /** Scan initial operation that created server cursor. */
+            private final CompletableFuture<Void> scanInitOp;
+
+            /**
+             * The constructor.
+             * @param subscriber The subscriber.
+             */
+            private PartitionScanSubscription(Subscriber<? super BinaryRow> subscriber) {
+                this.subscriber = subscriber;
+                this.isCanceled = new AtomicBoolean(false);
+                this.scanId = UUID_GENERATOR.randomUuid();
+                // TODO: IGNITE-15544 Close partition scans on node left.
+                this.scanInitOp = raftGrpSvc.run(new ScanInitCommand("", scanId));
+            }
+
+            /** {@inheritDoc} */
+            @Override public void request(long n) {
+                if (n < 0) {
+                    cancel();
+
+                    subscriber.onError(new IllegalArgumentException("Requested amount of items is less than 0."));
+                }
+
+                if (isCanceled.get())
+                    return;
+
+                scanInitOp.thenCompose((none) -> raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId)))
+                    .thenAccept(
+                        res -> {
+                            if (res.getValues() == null) {
+                                raftGrpSvc.run(new ScanCloseCommand(scanId)).exceptionally(closeT -> {
+                                    LOG.warn("Unable to close scan.", closeT);
+
+                                    return null;
+                                });

Review comment:
       ```suggestion
                                   cancel();
   ```

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
##########
@@ -195,5 +196,13 @@
     CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows,
         @Nullable Transaction tx);
 
-    //TODO: IGNTIE-14488. Add invoke() methods.
+    /**
+     * Scans given partition, providing {@link Publisher<BinaryRow>} that reflectively notifies about partition rows.
+     * @param p The partition.
+     * @param tx The transaction.
+     * @return {@link Publisher<BinaryRow>} t{@link Publisher<BinaryRow>} that reflectively notifies about partition rows.
+     */
+    @NotNull Publisher<BinaryRow> scan(int p, @Nullable Transaction tx);

Review comment:
       BTW `reflectively` or `reactively`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org