You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/25 06:59:51 UTC

[GitHub] [ignite-3] ascherbakoff opened a new pull request #147: IGNITE-14567

ascherbakoff opened a new pull request #147:
URL: https://github.com/apache/ignite-3/pull/147


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r641592583



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -287,67 +324,124 @@ public RaftGroupServiceImpl(
         if (leader == null)
             return refreshLeader().thenCompose(res -> run(cmd));
 
-        ActionRequest<R> req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+        CompletableFuture<ActionResponse<R>> fut = new CompletableFuture<>();
 
-        CompletableFuture<ActionResponse<R>> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
+        sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
         return fut.thenApply(resp -> resp.result());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
-        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+        CompletableFuture fut = cluster.messagingService().invoke(peer.address(), req, timeout);
 
-        CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        return fut.thenApply(resp -> ((ActionResponse) resp).result());
+    }
 
-        return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        if (!reuse)
+            cluster.shutdown();
     }
 
-    private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, NetworkMessage req, long stopTime) {
-        if (currentTimeMillis() >= stopTime)
-            return CompletableFuture.failedFuture(new TimeoutException());
-        return cluster.messagingService().invoke(node, req, timeout)
-            .thenCompose(resp -> {
-                if (resp instanceof RaftErrorResponse) {
-                    RaftErrorResponse resp0 = (RaftErrorResponse)resp;
-                    switch (resp0.errorCode()) {
-                        case NO_LEADER:
-                            return composeWithDelay(() -> sendWithRetry(randomNode(), req, stopTime));
-                        case LEADER_CHANGED:
-                            leader = resp0.newLeader();
-                            return composeWithDelay(() -> sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
-                        case SUCCESS:
-                            return CompletableFuture.completedFuture(null);
-                        default:
-                            return CompletableFuture.failedFuture(new RaftException(resp0.errorCode()));
+    /**
+     * Retries request until success or timeout.
+     *
+     * @param addr Target address.
+     * @param req The request.
+     * @param stopTime Stop time.
+     * @param fut The future.
+     * @param <R> Response type.
+     */
+    private <R> void sendWithRetry(Peer peer, Object req, long stopTime, CompletableFuture<R> fut) {

Review comment:
       Because it doesn't work as intended. I've added a test reproducing a problem: testUserRequestLeaderElectedAfterDelayWithFailedNode.
   
   Actually I prefer to not change current approach, because: 1) it works for me 2) it looks quite readable to me 3) it's a bit more efficient (creates less amount of futures. 4) I think your solution is not perfect either and can be improved.
   
   I'm ok to address this later in separate ticket, after merging jraft.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640437210



##########
File path: modules/network/src/main/java/org/apache/ignite/network/MessagingService.java
##########
@@ -61,12 +61,23 @@
      * returns a future that will be completed successfully upon receiving a response.
      *
      * @param recipient Recipient of the message.
-     * @param msg A message.
+     * @param msg The message.
      * @param timeout Waiting for response timeout in milliseconds.
      * @return A future holding the response or error if the expected response was not received.
      */
     CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
 
+    /**
+     * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
+     * returns a future that will be completed successfully upon receiving a response.
+     *
+     * @param addr Recipient network address in host:port format.
+     * @param msg A message.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    CompletableFuture<NetworkMessage> invoke(String addr, NetworkMessage msg, long timeout);

Review comment:
       To clearly separate messaging from discovery.
   You don't need a node to be discovered to send a message to it and it's better fits to raft communication model, code is much cleaner. Moreover, ClusterNode has the address [1]
   [1] org.apache.ignite.network.ClusterNode#address




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639464080



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
##########
@@ -35,19 +35,30 @@
      */
     Command command();
 
+    /**
+     * @return {@code True} for linearizable reading.
+     */
+    boolean readOnlySafe();

Review comment:
       Currently there are no usages of readOnlySafe(). Why we need it? In Which cases do you expect to use non-linearizable readings? Besides that it's a bit confusing cause readOnlySafe only has sense for read operations, but ActrionRequest both covers reads and writes. In other words within given abstaction it's possible to create write request with readOnlySafe set to false. Do we expect non-linearizable writes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640434972



##########
File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
+import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.PutCommand;
+import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+
+/**
+ * Meta storage listener.
+ * TODO: IGNITE-14693 Implement Meta storage exception handling logic.
+ */
+public class MetaStorageListener implements RaftGroupListener {
+    /** Storage. */
+    private final KeyValueStorage storage;
+
+    /** Cursors map. */
+    private final Map<IgniteUuid, IgniteBiTuple<Cursor<?>, CursorType>> cursors;
+
+    /**
+     * @param storage Storage.
+     */
+    public MetaStorageListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.cursors = new ConcurrentHashMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {

Review comment:
       This is true, but as soon as jraft will be merged I going to remove RaftServerImpl.
   This can be fixed by adding internal error callback to CommandClosureEx, but I think it's waste of time to fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639456546



##########
File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
+import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.PutCommand;
+import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+
+/**
+ * Meta storage listener.
+ * TODO: IGNITE-14693 Implement Meta storage exception handling logic.
+ */
+public class MetaStorageListener implements RaftGroupListener {
+    /** Storage. */
+    private final KeyValueStorage storage;
+
+    /** Cursors map. */
+    private final Map<IgniteUuid, IgniteBiTuple<Cursor<?>, CursorType>> cursors;
+
+    /**
+     * @param storage Storage.
+     */
+    public MetaStorageListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.cursors = new ConcurrentHashMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {

Review comment:
       Both for onRead() and onWrite(). Seems that within current implementation we don't propagate exceptions to client side.
   Say CompactedException was thrown on storage.get(). Within given implementation we will log it with
   ```
               catch (Exception e) {
                   LOG.error("Failed to process the command", e);
               }
   ```
   in RaftServerImpl#processQueue
   On the client side we'll hang on metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get() until TimeoutException is thrown.
   I believe that we still need catch block that will clo.result(businessLogicException) to user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640444227



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -88,6 +92,9 @@
     /** */
     private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
 
+    /** */
+    private final boolean reuse;

Review comment:
       It's used for tests currently, see [1]
   Helps to reduce boilerplate code.
   
   [1] org.apache.ignite.raft.server.ITRaftCounterServerTest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639461428



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/ActionRequestImpl.java
##########
@@ -22,20 +22,27 @@
 
 /** */
 class ActionRequestImpl<T> implements ActionRequest, ActionRequest.Builder {
+    /** */
+    private String groupId;
+
     /** */
     private Command cmd;
 
     /** */
-    private String groupId;
+    private boolean readOnlySafe;

Review comment:
       /** {@inheritDoc} */




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639467213



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
##########
@@ -30,14 +32,8 @@
     R command();
 
     /**
-     * Success outcome.
-     * @param res The result.
-     */
-    void success(Object res);
-
-    /**
-     * Failure outcome.
-     * @param err The error.
+     * Must be called after a command has been processed normally.
+     * @param res Execution result.
      */
-    void failure(Throwable err);
+    void result(Object res);

Review comment:
       Should we make an intention that res should be marshalable more obvious?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639479614



##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -99,6 +100,19 @@ public void stopRaftGroup(String groupId, List<ClusterNode> peers) {
         //Now we are using only one node in a raft group.
         //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol.
         if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
-            raftServer.clearListener(groupId);
+            raftServer.stopRaftGroup(groupId);
+    }
+
+    public RaftGroupService startRaftService(String groupId, List<ClusterNode> peers) {

Review comment:
       javadoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] asfgit closed pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #147:
URL: https://github.com/apache/ignite-3/pull/147


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640434972



##########
File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
+import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.PutCommand;
+import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+
+/**
+ * Meta storage listener.
+ * TODO: IGNITE-14693 Implement Meta storage exception handling logic.
+ */
+public class MetaStorageListener implements RaftGroupListener {
+    /** Storage. */
+    private final KeyValueStorage storage;
+
+    /** Cursors map. */
+    private final Map<IgniteUuid, IgniteBiTuple<Cursor<?>, CursorType>> cursors;
+
+    /**
+     * @param storage Storage.
+     */
+    public MetaStorageListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.cursors = new ConcurrentHashMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {

Review comment:
       This is true, but as soon as jraft will be merged I going to remove RaftServerImpl.
   This can be fixed by adding internal error callback to CommandClosureEx, but it's waste of time to fix it due to removal intention.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640817603



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -287,67 +324,124 @@ public RaftGroupServiceImpl(
         if (leader == null)
             return refreshLeader().thenCompose(res -> run(cmd));
 
-        ActionRequest<R> req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+        CompletableFuture<ActionResponse<R>> fut = new CompletableFuture<>();
 
-        CompletableFuture<ActionResponse<R>> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
+        sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
         return fut.thenApply(resp -> resp.result());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
-        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+        CompletableFuture fut = cluster.messagingService().invoke(peer.address(), req, timeout);
 
-        CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        return fut.thenApply(resp -> ((ActionResponse) resp).result());
+    }
 
-        return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        if (!reuse)
+            cluster.shutdown();
     }
 
-    private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, NetworkMessage req, long stopTime) {
-        if (currentTimeMillis() >= stopTime)
-            return CompletableFuture.failedFuture(new TimeoutException());
-        return cluster.messagingService().invoke(node, req, timeout)
-            .thenCompose(resp -> {
-                if (resp instanceof RaftErrorResponse) {
-                    RaftErrorResponse resp0 = (RaftErrorResponse)resp;
-                    switch (resp0.errorCode()) {
-                        case NO_LEADER:
-                            return composeWithDelay(() -> sendWithRetry(randomNode(), req, stopTime));
-                        case LEADER_CHANGED:
-                            leader = resp0.newLeader();
-                            return composeWithDelay(() -> sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
-                        case SUCCESS:
-                            return CompletableFuture.completedFuture(null);
-                        default:
-                            return CompletableFuture.failedFuture(new RaftException(resp0.errorCode()));
+    /**
+     * Retries request until success or timeout.
+     *
+     * @param addr Target address.
+     * @param req The request.
+     * @param stopTime Stop time.
+     * @param fut The future.
+     * @param <R> Response type.
+     */
+    private <R> void sendWithRetry(Peer peer, Object req, long stopTime, CompletableFuture<R> fut) {

Review comment:
       Why did you change the previous approach for implementing this method? I think it looked more elegant and readable. It is possible to implement the current logic in a similar way. Here's what I was able to come with:
   
   ```
   private <R> CompletableFuture<R> sendWithRetry(Peer peer, Object req, long stopTime) {
       if (currentTimeMillis() >= stopTime) {
           return CompletableFuture.failedFuture(new TimeoutException());
       }
   
       return cluster.messagingService().invoke(peer.address(), (NetworkMessage)req, timeout)
           .handle((resp, err) -> {
               if (err != null) {
                   if (recoverable(err)) {
                       return composeWithDelay(() -> this.<R>sendWithRetry(randomNode(), req, stopTime));
                   }
                   else {
                       return CompletableFuture.<R>failedFuture(err);
                   }
               }
               else if (resp instanceof RaftErrorResponse) {
                   RaftErrorResponse resp0 = (RaftErrorResponse)resp;
                   RaftErrorCode errorCode = resp0.errorCode();
   
                   if (errorCode == null) {
                       leader = peer;
   
                       return CompletableFuture.<R>completedFuture(null);
                   }
   
                   switch (errorCode) {
                       case NO_LEADER:
                           return composeWithDelay(() -> this.<R>sendWithRetry(randomNode(), req, stopTime));
                       case LEADER_CHANGED:
                           leader = resp0.newLeader(); // Update a leader.
   
                           return composeWithDelay(() -> this.<R>sendWithRetry(resp0.newLeader(), req, stopTime));
                       default:
                           return CompletableFuture.<R>failedFuture(new RaftException(errorCode));
                   }
               }
               else {
                   leader = peer;
   
                   return CompletableFuture.completedFuture((R)resp);
               }
           })
           .thenCompose(Function.identity());
   }
   ```
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640430762



##########
File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
+import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.PutCommand;
+import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+
+/**
+ * Meta storage listener.
+ * TODO: IGNITE-14693 Implement Meta storage exception handling logic.
+ */
+public class MetaStorageListener implements RaftGroupListener {
+    /** Storage. */
+    private final KeyValueStorage storage;
+
+    /** Cursors map. */
+    private final Map<IgniteUuid, IgniteBiTuple<Cursor<?>, CursorType>> cursors;
+
+    /**
+     * @param storage Storage.
+     */
+    public MetaStorageListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.cursors = new ConcurrentHashMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
+        while (iter.hasNext()) {
+            CommandClosure<ReadCommand> clo = iter.next();
+
+            if (clo.command() instanceof GetCommand) {
+                GetCommand getCmd = (GetCommand) clo.command();
+
+                Entry e;
+
+                if (getCmd.revision() != 0)
+                    e = storage.get(getCmd.key(), getCmd.revision());
+                else
+                    e = storage.get(getCmd.key());
+
+                SingleEntryResponse resp = new SingleEntryResponse(
+                    e.key(), e.value(), e.revision(), e.updateCounter()
+                );
+
+                clo.result(resp);
+            }
+            else if (clo.command() instanceof GetAllCommand) {
+                GetAllCommand getAllCmd = (GetAllCommand) clo.command();
+
+                Collection<Entry> entries;
+
+                if (getAllCmd.revision() != 0)
+                    entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
+                else
+                    entries = storage.getAll(getAllCmd.keys());
+
+                List<SingleEntryResponse> res = new ArrayList<>(entries.size());
+
+                for (Entry e : entries)
+                    res.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+                clo.result(new MultipleEntryResponse(res));
+            }
+            else if (clo.command() instanceof CursorHasNextCommand) {
+                CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand) clo.command();
+
+                assert cursors.containsKey(cursorHasNextCmd.cursorId());
+
+                clo.result(cursors.get(cursorHasNextCmd.cursorId()).getKey().hasNext());
+            }
+            else
+                assert false : "Command was not found [cmd=" + clo.command() + ']';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
+        while (iter.hasNext()) {
+            CommandClosure<WriteCommand> clo = iter.next();
+
+            if (clo.command() instanceof PutCommand) {
+                PutCommand putCmd = (PutCommand) clo.command();
+
+                storage.put(putCmd.key(), putCmd.value());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndPutCommand) {
+                GetAndPutCommand getAndPutCmd = (GetAndPutCommand) clo.command();
+
+                Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());
+
+                clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+            else if (clo.command() instanceof PutAllCommand) {
+                PutAllCommand putAllCmd = (PutAllCommand) clo.command();
+
+                storage.putAll(putAllCmd.keys(), putAllCmd.values());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndPutAllCommand) {
+                GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) clo.command();
+
+                Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.vals());
+
+                List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+                for (Entry e : entries)
+                    resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+                clo.result(new MultipleEntryResponse(resp));
+            }
+            else if (clo.command() instanceof RemoveCommand) {
+                RemoveCommand rmvCmd = (RemoveCommand) clo.command();
+
+                storage.remove(rmvCmd.key());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndRemoveCommand) {
+                GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) clo.command();
+
+                Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+                clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+            else if (clo.command() instanceof RemoveAllCommand) {
+                RemoveAllCommand rmvAllCmd = (RemoveAllCommand) clo.command();
+
+                storage.removeAll(rmvAllCmd.keys());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndRemoveAllCommand) {
+                GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) clo.command();
+
+                Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+                List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+                for (Entry e : entries)
+                    resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+                clo.result(new MultipleEntryResponse(resp));
+            }
+            else if (clo.command() instanceof InvokeCommand) {
+                InvokeCommand cmd = (InvokeCommand) clo.command();
+
+                boolean res = storage.invoke(
+                    toCondition(cmd.condition()),
+                    toOperations(cmd.success()),
+                    toOperations(cmd.failure())
+                );
+
+                clo.result(res);
+            }
+            else if (clo.command() instanceof RangeCommand) {
+                RangeCommand rangeCmd = (RangeCommand) clo.command();
+
+                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+
+                Cursor<Entry> cursor = storage.range(
+                    rangeCmd.keyFrom(),
+                    rangeCmd.keyTo(),
+                    rangeCmd.revUpperBound()
+                );
+
+                cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.RANGE));
+
+                clo.result(cursorId);
+            }
+            else if (clo.command() instanceof CursorNextCommand) {
+                CursorNextCommand cursorNextCmd = (CursorNextCommand) clo.command();
+
+                assert cursors.containsKey(cursorNextCmd.cursorId());
+
+                IgniteBiTuple<Cursor<?>, CursorType> cursorDesc = cursors.get(cursorNextCmd.cursorId());
+
+                if (cursorDesc.getValue() == CursorType.RANGE) {
+                    Entry e = (Entry) cursorDesc.getKey().next();
+
+                    clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+                }
+                else if (cursorDesc.getValue() == CursorType.WATCH) {
+                    WatchEvent evt = (WatchEvent) cursorDesc.getKey().next();
+
+                    List<SingleEntryResponse> resp = new ArrayList<>(evt.entryEvents().size() * 2);
+
+                    for (EntryEvent e : evt.entryEvents()) {
+                        Entry o = e.oldEntry();
+
+                        Entry n = e.entry();
+
+                        resp.add(new SingleEntryResponse(o.key(), o.value(), o.revision(), o.updateCounter()));
+
+                        resp.add(new SingleEntryResponse(n.key(), n.value(), n.revision(), n.updateCounter()));
+                    }
+
+                    clo.result(new MultipleEntryResponse(resp));
+                }
+            }
+            else if (clo.command() instanceof CursorCloseCommand) {
+                CursorCloseCommand cursorCloseCmd = (CursorCloseCommand) clo.command();
+
+                IgniteBiTuple<Cursor<?>, CursorType> val = cursors.get(cursorCloseCmd.cursorId());
+
+                if (val == null) {
+                    clo.result(null);
+
+                    return;
+                }
+
+                try {
+                    val.getKey().close();
+                }
+                catch (Exception e) {
+                    throw new IgniteInternalException(e);
+                }
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof WatchRangeKeysCommand) {
+                WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) clo.command();
+
+                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+
+                Cursor<WatchEvent> cursor =
+                    storage.watch(watchCmd.keyFrom(), watchCmd.keyTo(), watchCmd.revision());
+
+                cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.WATCH));
+
+                clo.result(cursorId);
+            }
+            else if (clo.command() instanceof WatchExactKeysCommand) {
+                WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) clo.command();
+
+                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+
+                Cursor<WatchEvent> cursor = storage.watch(watchCmd.keys(), watchCmd.revision());
+
+                cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.WATCH));
+
+                clo.result(cursorId);
+            }
+            else
+                assert false : "Command was not found [cmd=" + clo.command() + ']';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+        // Not implemented yet.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onSnapshotLoad(String path) {
+        // Not implemented yet.
+        return false;
+    }
+
+    /** */
+    private static Condition toCondition(ConditionInfo info) {
+        byte[] key = info.key();
+
+        ConditionType type = info.type();
+
+        if (type == ConditionType.KEY_EXISTS)

Review comment:
       Agreed, but this is actually not my code. 
   Class was renamed by me. 
   Let's fix it by another ticket.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640445881



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * A listener for replication group events.
+ */
+public interface RaftGroupListener {
+    /**
+     * The callback to apply read commands.
+     * <p>
+     * If the runtime exception is thrown during iteration all unprocessed read requests will be aborted with the STM

Review comment:
       It's true. Javadoc directly tells about runtime exceptions:
   If the **runtime** exception ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640438443



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
##########
@@ -35,19 +35,30 @@
      */
     Command command();
 
+    /**
+     * @return {@code True} for linearizable reading.
+     */
+    boolean readOnlySafe();

Review comment:
       Usages are in jraft branch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639458813



##########
File path: modules/network/src/main/java/org/apache/ignite/network/MessagingService.java
##########
@@ -61,12 +61,23 @@
      * returns a future that will be completed successfully upon receiving a response.
      *
      * @param recipient Recipient of the message.
-     * @param msg A message.
+     * @param msg The message.
      * @param timeout Waiting for response timeout in milliseconds.
      * @return A future holding the response or error if the expected response was not received.
      */
     CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout);
 
+    /**
+     * Sends a message asynchronously with same guarantees as {@link #send(ClusterNode, NetworkMessage)} and
+     * returns a future that will be completed successfully upon receiving a response.
+     *
+     * @param addr Recipient network address in host:port format.
+     * @param msg A message.
+     * @param timeout Waiting for response timeout in milliseconds.
+     * @return A future holding the response or error if the expected response was not received.
+     */
+    CompletableFuture<NetworkMessage> invoke(String addr, NetworkMessage msg, long timeout);

Review comment:
       Why we need it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639471515



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * A listener for replication group events.
+ */
+public interface RaftGroupListener {
+    /**
+     * The callback to apply read commands.
+     * <p>
+     * If the runtime exception is thrown during iteration all unprocessed read requests will be aborted with the STM

Review comment:
       I believe that is relates only to unhandled runtime exceptions, cause nothing is wrong in further processing reads and writes in case of business logic runtime exceptions that were properly handled and propagated to client side with the help of clo.result(). For example, as was mentioned above CompactionException on metaStorage.get() shouldn't prevent further processing but should be propagated to client service.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r639478402



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -88,6 +92,9 @@
     /** */
     private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
 
+    /** */
+    private final boolean reuse;

Review comment:
       Could you please clarify why we need it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640795892



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
##########
@@ -90,22 +91,31 @@ void fireEvent(Message message) {
             .withData(msg)
             .correlationId(correlationId)
             .build();
+
         return cluster
             .send(clusterNodeAddress(recipient), message)
             .toFuture();
     }
 
     /** {@inheritDoc} */
-    @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
+    @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, final NetworkMessage msg, long timeout) {

Review comment:
       ```suggestion
       @Override public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) {
   ```

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -287,67 +324,124 @@ public RaftGroupServiceImpl(
         if (leader == null)
             return refreshLeader().thenCompose(res -> run(cmd));
 
-        ActionRequest<R> req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+        CompletableFuture<ActionResponse<R>> fut = new CompletableFuture<>();
 
-        CompletableFuture<ActionResponse<R>> fut = sendWithRetry(leader.getNode(), req, currentTimeMillis() + timeout);
+        sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
         return fut.thenApply(resp -> resp.result());
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
-        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).build();
+        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
+
+        CompletableFuture fut = cluster.messagingService().invoke(peer.address(), req, timeout);
 
-        CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        return fut.thenApply(resp -> ((ActionResponse) resp).result());
+    }
 
-        return fut.thenApply(resp -> ((ActionResponse<R>) resp).result());
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        if (!reuse)
+            cluster.shutdown();
     }
 
-    private <R> CompletableFuture<R> sendWithRetry(ClusterNode node, NetworkMessage req, long stopTime) {
-        if (currentTimeMillis() >= stopTime)
-            return CompletableFuture.failedFuture(new TimeoutException());
-        return cluster.messagingService().invoke(node, req, timeout)
-            .thenCompose(resp -> {
-                if (resp instanceof RaftErrorResponse) {
-                    RaftErrorResponse resp0 = (RaftErrorResponse)resp;
-                    switch (resp0.errorCode()) {
-                        case NO_LEADER:
-                            return composeWithDelay(() -> sendWithRetry(randomNode(), req, stopTime));
-                        case LEADER_CHANGED:
-                            leader = resp0.newLeader();
-                            return composeWithDelay(() -> sendWithRetry(resp0.newLeader().getNode(), req, stopTime));
-                        case SUCCESS:
-                            return CompletableFuture.completedFuture(null);
-                        default:
-                            return CompletableFuture.failedFuture(new RaftException(resp0.errorCode()));
+    /**
+     * Retries request until success or timeout.
+     *
+     * @param addr Target address.
+     * @param req The request.
+     * @param stopTime Stop time.
+     * @param fut The future.
+     * @param <R> Response type.
+     */
+    private <R> void sendWithRetry(Peer peer, Object req, long stopTime, CompletableFuture<R> fut) {

Review comment:
       Why did you change the previous approach for implementing this method? I think it looked more elegant and readable. It is possible to implement the current logic in a similar way. Here's what I as able to come with:
   
   ```
   private <R> CompletableFuture<R> sendWithRetry(Peer peer, Object req, long stopTime) {
       if (currentTimeMillis() >= stopTime) {
           return CompletableFuture.failedFuture(new TimeoutException());
       }
   
       return cluster.messagingService().invoke(peer.address(), (NetworkMessage)req, timeout)
           .handle((resp, err) -> {
               if (err != null) {
                   if (recoverable(err)) {
                       return composeWithDelay(() -> this.<R>sendWithRetry(randomNode(), req, stopTime));
                   }
                   else {
                       return CompletableFuture.<R>failedFuture(err);
                   }
               }
               else if (resp instanceof RaftErrorResponse) {
                   RaftErrorResponse resp0 = (RaftErrorResponse)resp;
                   RaftErrorCode errorCode = resp0.errorCode();
   
                   if (errorCode == null) {
                       leader = peer;
   
                       return CompletableFuture.<R>completedFuture(null);
                   }
   
                   switch (errorCode) {
                       case NO_LEADER:
                           return composeWithDelay(() -> this.<R>sendWithRetry(randomNode(), req, stopTime));
                       case LEADER_CHANGED:
                           leader = resp0.newLeader(); // Update a leader.
   
                           return composeWithDelay(() -> this.<R>sendWithRetry(resp0.newLeader(), req, stopTime));
                       default:
                           return CompletableFuture.<R>failedFuture(new RaftException(errorCode));
                   }
               }
               else {
                   leader = peer;
   
                   return CompletableFuture.completedFuture((R)resp);
               }
           })
           .thenCompose(Function.identity());
   }
   ```
   
   What do you think?

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
##########
@@ -35,19 +35,30 @@
      */
     Command command();
 
+    /**
+     * @return {@code True} for linearizable reading.
+     */
+    boolean readOnlySafe();

Review comment:
       This getter is actually never used, only the setter. Can you also elaborate please, where does the `readOnlySafe` concept comes from? I don't understand how is it going to be used for linearization....

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/CommandClosure.java
##########
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.raft.client.service;
 
+import java.io.Serializable;
 import org.apache.ignite.raft.client.Command;
 
 /**
- * A closure to notify abbout command processing outcome.
+ * A closure to notify about command processing outcome.

Review comment:
       ```suggestion
    * A closure to notify about a command processing outcome.
   ```

##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -261,9 +286,21 @@ public RaftGroupServiceImpl(
     @Override public CompletableFuture<Void> snapshot(Peer peer) {
         SnapshotRequest req = factory.snapshotRequest().groupId(groupId).build();
 
-        CompletableFuture<?> fut = cluster.messagingService().invoke(peer.getNode(), req, timeout);
+        // Disable the timeout for a snapshot request.
+        CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(peer.address(), req, Integer.MAX_VALUE);
 
-        return fut.thenApply(resp -> null);
+        return fut.handle(new BiFunction<NetworkMessage, Throwable, Void>() {
+            @Override public Void apply(NetworkMessage resp, Throwable throwable) {

Review comment:
       why do you ignore the `throwable` that might be passed to this method? Should we use `thenApply` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640444227



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
##########
@@ -88,6 +92,9 @@
     /** */
     private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
 
+    /** */
+    private final boolean reuse;

Review comment:
       It's used for tests currently and allows to have the same lifecycle for cluster and raft server, see [1]
   Helps to reduce boilerplate code.
   
   [1] org.apache.ignite.raft.server.ITRaftCounterServerTest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640445881



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * A listener for replication group events.
+ */
+public interface RaftGroupListener {
+    /**
+     * The callback to apply read commands.
+     * <p>
+     * If the runtime exception is thrown during iteration all unprocessed read requests will be aborted with the STM

Review comment:
       Javadoc directly tells about runtime exceptions:
   If the **runtime** exception ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r641471086



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
##########
@@ -35,19 +35,30 @@
      */
     Command command();
 
+    /**
+     * @return {@code True} for linearizable reading.
+     */
+    boolean readOnlySafe();

Review comment:
       This concept comes from RAFT linerializable reading guarantees, which require stable leader before reading. 
   It makes no sense for single node RAFT cluster, so no getter usage here.
   You can find actual usage in ignite-13885




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ascherbakoff commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
ascherbakoff commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r640438443



##########
File path: modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/ActionRequest.java
##########
@@ -35,19 +35,30 @@
      */
     Command command();
 
+    /**
+     * @return {@code True} for linearizable reading.
+     */
+    boolean readOnlySafe();

Review comment:
       You are wrong, usages are in [1] and [2]
   [1] org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl#run(org.apache.ignite.raft.client.Peer, org.apache.ignite.raft.client.ReadCommand)
   [2] org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl#run(org.apache.ignite.raft.client.Command)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #147: IGNITE-14567

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #147:
URL: https://github.com/apache/ignite-3/pull/147#discussion_r638919832



##########
File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
+import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.PutCommand;
+import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
+import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+
+/**
+ * Meta storage listener.
+ * TODO: IGNITE-14693 Implement Meta storage exception handling logic.
+ */
+public class MetaStorageListener implements RaftGroupListener {
+    /** Storage. */
+    private final KeyValueStorage storage;
+
+    /** Cursors map. */
+    private final Map<IgniteUuid, IgniteBiTuple<Cursor<?>, CursorType>> cursors;
+
+    /**
+     * @param storage Storage.
+     */
+    public MetaStorageListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.cursors = new ConcurrentHashMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
+        while (iter.hasNext()) {
+            CommandClosure<ReadCommand> clo = iter.next();
+
+            if (clo.command() instanceof GetCommand) {
+                GetCommand getCmd = (GetCommand) clo.command();
+
+                Entry e;
+
+                if (getCmd.revision() != 0)
+                    e = storage.get(getCmd.key(), getCmd.revision());
+                else
+                    e = storage.get(getCmd.key());
+
+                SingleEntryResponse resp = new SingleEntryResponse(
+                    e.key(), e.value(), e.revision(), e.updateCounter()
+                );
+
+                clo.result(resp);
+            }
+            else if (clo.command() instanceof GetAllCommand) {
+                GetAllCommand getAllCmd = (GetAllCommand) clo.command();
+
+                Collection<Entry> entries;
+
+                if (getAllCmd.revision() != 0)
+                    entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
+                else
+                    entries = storage.getAll(getAllCmd.keys());
+
+                List<SingleEntryResponse> res = new ArrayList<>(entries.size());
+
+                for (Entry e : entries)
+                    res.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+                clo.result(new MultipleEntryResponse(res));
+            }
+            else if (clo.command() instanceof CursorHasNextCommand) {
+                CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand) clo.command();
+
+                assert cursors.containsKey(cursorHasNextCmd.cursorId());
+
+                clo.result(cursors.get(cursorHasNextCmd.cursorId()).getKey().hasNext());
+            }
+            else
+                assert false : "Command was not found [cmd=" + clo.command() + ']';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
+        while (iter.hasNext()) {
+            CommandClosure<WriteCommand> clo = iter.next();
+
+            if (clo.command() instanceof PutCommand) {
+                PutCommand putCmd = (PutCommand) clo.command();
+
+                storage.put(putCmd.key(), putCmd.value());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndPutCommand) {
+                GetAndPutCommand getAndPutCmd = (GetAndPutCommand) clo.command();
+
+                Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());
+
+                clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+            else if (clo.command() instanceof PutAllCommand) {
+                PutAllCommand putAllCmd = (PutAllCommand) clo.command();
+
+                storage.putAll(putAllCmd.keys(), putAllCmd.values());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndPutAllCommand) {
+                GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) clo.command();
+
+                Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.vals());
+
+                List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+                for (Entry e : entries)
+                    resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+                clo.result(new MultipleEntryResponse(resp));
+            }
+            else if (clo.command() instanceof RemoveCommand) {
+                RemoveCommand rmvCmd = (RemoveCommand) clo.command();
+
+                storage.remove(rmvCmd.key());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndRemoveCommand) {
+                GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) clo.command();
+
+                Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+                clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+            else if (clo.command() instanceof RemoveAllCommand) {
+                RemoveAllCommand rmvAllCmd = (RemoveAllCommand) clo.command();
+
+                storage.removeAll(rmvAllCmd.keys());
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof GetAndRemoveAllCommand) {
+                GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) clo.command();
+
+                Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+                List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+                for (Entry e : entries)
+                    resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+                clo.result(new MultipleEntryResponse(resp));
+            }
+            else if (clo.command() instanceof InvokeCommand) {
+                InvokeCommand cmd = (InvokeCommand) clo.command();
+
+                boolean res = storage.invoke(
+                    toCondition(cmd.condition()),
+                    toOperations(cmd.success()),
+                    toOperations(cmd.failure())
+                );
+
+                clo.result(res);
+            }
+            else if (clo.command() instanceof RangeCommand) {
+                RangeCommand rangeCmd = (RangeCommand) clo.command();
+
+                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+
+                Cursor<Entry> cursor = storage.range(
+                    rangeCmd.keyFrom(),
+                    rangeCmd.keyTo(),
+                    rangeCmd.revUpperBound()
+                );
+
+                cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.RANGE));
+
+                clo.result(cursorId);
+            }
+            else if (clo.command() instanceof CursorNextCommand) {
+                CursorNextCommand cursorNextCmd = (CursorNextCommand) clo.command();
+
+                assert cursors.containsKey(cursorNextCmd.cursorId());
+
+                IgniteBiTuple<Cursor<?>, CursorType> cursorDesc = cursors.get(cursorNextCmd.cursorId());
+
+                if (cursorDesc.getValue() == CursorType.RANGE) {
+                    Entry e = (Entry) cursorDesc.getKey().next();
+
+                    clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+                }
+                else if (cursorDesc.getValue() == CursorType.WATCH) {
+                    WatchEvent evt = (WatchEvent) cursorDesc.getKey().next();
+
+                    List<SingleEntryResponse> resp = new ArrayList<>(evt.entryEvents().size() * 2);
+
+                    for (EntryEvent e : evt.entryEvents()) {
+                        Entry o = e.oldEntry();
+
+                        Entry n = e.entry();
+
+                        resp.add(new SingleEntryResponse(o.key(), o.value(), o.revision(), o.updateCounter()));
+
+                        resp.add(new SingleEntryResponse(n.key(), n.value(), n.revision(), n.updateCounter()));
+                    }
+
+                    clo.result(new MultipleEntryResponse(resp));
+                }
+            }
+            else if (clo.command() instanceof CursorCloseCommand) {
+                CursorCloseCommand cursorCloseCmd = (CursorCloseCommand) clo.command();
+
+                IgniteBiTuple<Cursor<?>, CursorType> val = cursors.get(cursorCloseCmd.cursorId());
+
+                if (val == null) {
+                    clo.result(null);
+
+                    return;
+                }
+
+                try {
+                    val.getKey().close();
+                }
+                catch (Exception e) {
+                    throw new IgniteInternalException(e);
+                }
+
+                clo.result(null);
+            }
+            else if (clo.command() instanceof WatchRangeKeysCommand) {
+                WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) clo.command();
+
+                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+
+                Cursor<WatchEvent> cursor =
+                    storage.watch(watchCmd.keyFrom(), watchCmd.keyTo(), watchCmd.revision());
+
+                cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.WATCH));
+
+                clo.result(cursorId);
+            }
+            else if (clo.command() instanceof WatchExactKeysCommand) {
+                WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) clo.command();
+
+                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+
+                Cursor<WatchEvent> cursor = storage.watch(watchCmd.keys(), watchCmd.revision());
+
+                cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.WATCH));
+
+                clo.result(cursorId);
+            }
+            else
+                assert false : "Command was not found [cmd=" + clo.command() + ']';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+        // Not implemented yet.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onSnapshotLoad(String path) {
+        // Not implemented yet.
+        return false;
+    }
+
+    /** */
+    private static Condition toCondition(ConditionInfo info) {
+        byte[] key = info.key();
+
+        ConditionType type = info.type();
+
+        if (type == ConditionType.KEY_EXISTS)

Review comment:
       `Switch(type)` construction will be more natural and more effective here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org