You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/08 16:20:33 UTC

[GitHub] [ignite-3] alievmirza commented on a change in pull request #167: IGNITE-13885

alievmirza commented on a change in pull request #167:
URL: https://github.com/apache/ignite-3/pull/167#discussion_r647477124



##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft;
+
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.rpc.RpcServer;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.StringUtils;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A raft group service.
+ */
+public class RaftGroupService {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftGroupService.class);
+
+    private volatile boolean started = false;

Review comment:
       Please add java doc for the flag. Anyway, it's inner jraft implementation and it's up to you

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft;
+
+import java.util.List;
+import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.NodeId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.Task;
+import org.apache.ignite.raft.jraft.entity.UserLog;
+import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
+import org.apache.ignite.raft.jraft.error.LogNotFoundException;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.util.Describer;
+
+/**
+ * A raft replica node.
+ */
+public interface Node extends Lifecycle<NodeOptions>, Describer {
+    /**
+     * Get the leader peer id for redirect, null if absent.
+     */
+    PeerId getLeaderId();
+
+    /**
+     * Get current node id.
+     */
+    NodeId getNodeId();
+
+    /**
+     * Get the node metrics, only valid when node option {@link NodeOptions#isEnableMetrics()} is true.
+     */
+    NodeMetrics getNodeMetrics();
+
+    /**
+     * Get the raft group id.
+     */
+    String getGroupId();
+
+    /**
+     * Get the node options.
+     */
+    NodeOptions getOptions();
+
+    /**
+     * Get the raft options
+     */
+    RaftOptions getRaftOptions();
+
+    /**
+     * Returns true when the node is leader.
+     */
+    boolean isLeader();
+
+    /**
+     * Returns true when the node is leader.
+     *
+     * @param blocking if true, will be blocked until the node finish it's state change
+     */
+    boolean isLeader(final boolean blocking);
+
+    /**
+     * Shutdown local replica node.
+     *
+     * @param done callback
+     */
+    void shutdown(final Closure done);
+
+    /**
+     * Block the thread until the node is successfully stopped.
+     *
+     * @throws InterruptedException if the current thread is interrupted while waiting
+     */
+    void join() throws InterruptedException;
+
+    /**
+     * [Thread-safe and wait-free]
+     *
+     * Apply task to the replicated-state-machine
+     *
+     * About the ownership: |task.data|: for the performance consideration, we will take away the content. If you want
+     * keep the content, copy it before call this function |task.done|: If the data is successfully committed to the
+     * raft group. We will pass the ownership to #{@link StateMachine#onApply(Iterator)}. Otherwise we will specify the
+     * error and call it.
+     *
+     * @param task task to apply
+     */
+    void apply(final Task task);
+
+    /**
+     * [Thread-safe and wait-free]
+     *
+     * Starts a linearizable read-only query request with request context(optional, such as request id etc.) and
+     * closure.  The closure will be called when the request is completed, and user can read data from state machine if
+     * the result status is OK.
+     *
+     * @param requestContext the context of request
+     * @param done callback
+     */
+    void readIndex(final byte[] requestContext, final ReadIndexClosure done);
+
+    /**
+     * List peers of this raft group, only leader returns.
+     *
+     * [NOTE] <strong>when list_peers concurrency with {@link #addPeer(PeerId, Closure)}/{@link #removePeer(PeerId,
+     * Closure)}, maybe return peers is staled.  Because {@link #addPeer(PeerId, Closure)}/{@link #removePeer(PeerId,
+     * Closure)} immediately modify configuration in memory</strong>
+     *
+     * @return the peer list
+     */
+    List<PeerId> listPeers();
+
+    /**
+     * List all alive peers of this raft group, only leader returns.</p>
+     *
+     * [NOTE] <strong>list_alive_peers is just a transient data (snapshot) and a short-term loss of response by the
+     * follower will cause it to temporarily not exist in this list.</strong>
+     *
+     * @return the alive peer list
+     */
+    List<PeerId> listAlivePeers();
+
+    /**
+     * List all learners of this raft group, only leader returns.</p>
+     *
+     * [NOTE] <strong>when listLearners concurrency with {@link #addLearners(List, Closure)}/{@link
+     * #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}, maybe return peers is staled.  Because
+     * {@link #addLearners(List, Closure)}/{@link #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}
+     * immediately modify configuration in memory</strong>
+     *
+     * @return the learners set
+     */
+    List<PeerId> listLearners();
+
+    /**
+     * List all alive learners of this raft group, only leader returns.</p>
+     *
+     * [NOTE] <strong>when listAliveLearners concurrency with {@link #addLearners(List, Closure)}/{@link
+     * #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}, maybe return peers is staled.  Because
+     * {@link #addLearners(List, Closure)}/{@link #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}
+     * immediately modify configuration in memory</strong>
+     *
+     * @return the  alive learners set
+     */
+    List<PeerId> listAliveLearners();
+
+    /**
+     * Add a new peer to the raft group. done.run() would be invoked after this operation finishes, describing the
+     * detailed result.
+     *
+     * @param peer peer to add
+     * @param done callback
+     */
+    void addPeer(final PeerId peer, final Closure done);
+
+    /**
+     * Remove the peer from the raft group. done.run() would be invoked after operation finishes, describing the
+     * detailed result.
+     *
+     * @param peer peer to remove
+     * @param done callback
+     */
+    void removePeer(final PeerId peer, final Closure done);
+
+    /**
+     * Change the configuration of the raft group to |newPeers| , done.run() would be invoked after this operation
+     * finishes, describing the detailed result.
+     *
+     * @param newPeers new peers to change
+     * @param done callback
+     */
+    void changePeers(final Configuration newPeers, final Closure done);
+
+    /**
+     * Reset the configuration of this node individually, without any replication to other peers before this node
+     * becomes the leader. This function is supposed to be invoked when the majority of the replication group are dead
+     * and you'd like to revive the service in the consideration of availability. Notice that neither consistency nor
+     * consensus are guaranteed in this case, BE CAREFULE when dealing with this method.
+     *
+     * @param newPeers new peers
+     */
+    Status resetPeers(final Configuration newPeers);
+
+    /**
+     * Add some new learners to the raft group. done.run() will be invoked after this operation finishes, describing the
+     * detailed result.
+     *
+     * @param learners learners to add
+     * @param done callback
+     */
+    void addLearners(final List<PeerId> learners, final Closure done);
+
+    /**
+     * Remove some learners from the raft group. done.run() will be invoked after this operation finishes, describing
+     * the detailed result.
+     *
+     * @param learners learners to remove
+     * @param done callback
+     */
+    void removeLearners(final List<PeerId> learners, final Closure done);
+
+    /**
+     * Reset learners in the raft group. done.run() will be invoked after this operation finishes, describing the
+     * detailed result.
+     *
+     * @param learners learners to set
+     * @param done callback
+     */
+    void resetLearners(final List<PeerId> learners, final Closure done);
+
+    /**
+     * Start a snapshot immediately if possible. done.run() would be invoked when the snapshot finishes, describing the
+     * detailed result.
+     *
+     * @param done callback
+     */
+    void snapshot(final Closure done);
+
+    /**
+     * Reset the election_timeout for the every node.
+     *
+     * @param electionTimeoutMs the timeout millis of election
+     */
+    void resetElectionTimeoutMs(final int electionTimeoutMs);
+
+    /**
+     * Try transferring leadership to |peer|. If peer is ANY_PEER, a proper follower will be chosen as the leader for
+     * the next term. Returns 0 on success, -1 otherwise.
+     *
+     * @param peer the target peer of new leader
+     * @return operation status
+     */
+    Status transferLeadershipTo(final PeerId peer);
+
+    /**
+     * Read the first committed user log from the given index. Return OK on success and user_log is assigned with the
+     * very data. Be awared that the user_log may be not the exact log at the given index, but the first available user
+     * log from the given index to lastCommittedIndex. Otherwise, appropriate errors are returned: - return ELOGDELETED
+     * when the log has been deleted; - return ENOMOREUSERLOG when we can't get a user log even reaching

Review comment:
       what is this? ENOMOREUSERLOG, ELOGDELETED 

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/Iterator.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Iterator over a batch of committed tasks.
+ *
+ * @see StateMachine#onApply(Iterator)
+ */
+public interface Iterator extends java.util.Iterator<ByteBuffer> {

Review comment:
       Let's add proper javadocs with @ return @ param at least for interfaces. Anyway, it's inner jraft implementation and it's up to you

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
##########
@@ -46,6 +46,7 @@
 
 /**
  * A single node service implementation.
+ * @deprecated TODO asch Replace with jraft server.

Review comment:
       need ticket for that 

##########
File path: modules/raft/pom.xml
##########
@@ -77,6 +123,15 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
             </plugin>
+            <!-- Disable javadoc validataion for forked foreign module. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>

Review comment:
       it's strange to disable check for the whole module, i would do that only for org.apache.ignite.raft.jraft

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/HashedWheelTimer.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Copyright 2014 The Netty Project

Review comment:
       Is it ok to have The Netty Project license headers? 

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft;
+
+import java.util.List;
+import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.NodeId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.Task;
+import org.apache.ignite.raft.jraft.entity.UserLog;
+import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
+import org.apache.ignite.raft.jraft.error.LogNotFoundException;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.util.Describer;
+
+/**
+ * A raft replica node.
+ */
+public interface Node extends Lifecycle<NodeOptions>, Describer {
+    /**
+     * Get the leader peer id for redirect, null if absent.
+     */
+    PeerId getLeaderId();
+
+    /**
+     * Get current node id.
+     */
+    NodeId getNodeId();
+
+    /**
+     * Get the node metrics, only valid when node option {@link NodeOptions#isEnableMetrics()} is true.
+     */
+    NodeMetrics getNodeMetrics();
+
+    /**
+     * Get the raft group id.
+     */
+    String getGroupId();
+
+    /**
+     * Get the node options.
+     */
+    NodeOptions getOptions();
+
+    /**
+     * Get the raft options
+     */
+    RaftOptions getRaftOptions();
+
+    /**
+     * Returns true when the node is leader.
+     */
+    boolean isLeader();
+
+    /**
+     * Returns true when the node is leader.
+     *
+     * @param blocking if true, will be blocked until the node finish it's state change
+     */
+    boolean isLeader(final boolean blocking);
+
+    /**
+     * Shutdown local replica node.
+     *
+     * @param done callback
+     */
+    void shutdown(final Closure done);
+
+    /**
+     * Block the thread until the node is successfully stopped.
+     *
+     * @throws InterruptedException if the current thread is interrupted while waiting
+     */
+    void join() throws InterruptedException;
+
+    /**
+     * [Thread-safe and wait-free]
+     *
+     * Apply task to the replicated-state-machine
+     *
+     * About the ownership: |task.data|: for the performance consideration, we will take away the content. If you want
+     * keep the content, copy it before call this function |task.done|: If the data is successfully committed to the
+     * raft group. We will pass the ownership to #{@link StateMachine#onApply(Iterator)}. Otherwise we will specify the
+     * error and call it.
+     *
+     * @param task task to apply
+     */
+    void apply(final Task task);
+
+    /**
+     * [Thread-safe and wait-free]
+     *
+     * Starts a linearizable read-only query request with request context(optional, such as request id etc.) and
+     * closure.  The closure will be called when the request is completed, and user can read data from state machine if
+     * the result status is OK.
+     *
+     * @param requestContext the context of request
+     * @param done callback
+     */
+    void readIndex(final byte[] requestContext, final ReadIndexClosure done);
+
+    /**
+     * List peers of this raft group, only leader returns.
+     *
+     * [NOTE] <strong>when list_peers concurrency with {@link #addPeer(PeerId, Closure)}/{@link #removePeer(PeerId,
+     * Closure)}, maybe return peers is staled.  Because {@link #addPeer(PeerId, Closure)}/{@link #removePeer(PeerId,
+     * Closure)} immediately modify configuration in memory</strong>
+     *
+     * @return the peer list
+     */
+    List<PeerId> listPeers();
+
+    /**
+     * List all alive peers of this raft group, only leader returns.</p>
+     *
+     * [NOTE] <strong>list_alive_peers is just a transient data (snapshot) and a short-term loss of response by the
+     * follower will cause it to temporarily not exist in this list.</strong>
+     *
+     * @return the alive peer list
+     */
+    List<PeerId> listAlivePeers();
+
+    /**
+     * List all learners of this raft group, only leader returns.</p>
+     *
+     * [NOTE] <strong>when listLearners concurrency with {@link #addLearners(List, Closure)}/{@link
+     * #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}, maybe return peers is staled.  Because
+     * {@link #addLearners(List, Closure)}/{@link #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}
+     * immediately modify configuration in memory</strong>
+     *
+     * @return the learners set
+     */
+    List<PeerId> listLearners();
+
+    /**
+     * List all alive learners of this raft group, only leader returns.</p>
+     *
+     * [NOTE] <strong>when listAliveLearners concurrency with {@link #addLearners(List, Closure)}/{@link
+     * #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}, maybe return peers is staled.  Because
+     * {@link #addLearners(List, Closure)}/{@link #removeLearners(List, Closure)}/{@link #resetLearners(List, Closure)}
+     * immediately modify configuration in memory</strong>
+     *
+     * @return the  alive learners set
+     */
+    List<PeerId> listAliveLearners();
+
+    /**
+     * Add a new peer to the raft group. done.run() would be invoked after this operation finishes, describing the
+     * detailed result.
+     *
+     * @param peer peer to add
+     * @param done callback
+     */
+    void addPeer(final PeerId peer, final Closure done);
+
+    /**
+     * Remove the peer from the raft group. done.run() would be invoked after operation finishes, describing the
+     * detailed result.
+     *
+     * @param peer peer to remove
+     * @param done callback
+     */
+    void removePeer(final PeerId peer, final Closure done);
+
+    /**
+     * Change the configuration of the raft group to |newPeers| , done.run() would be invoked after this operation
+     * finishes, describing the detailed result.
+     *
+     * @param newPeers new peers to change
+     * @param done callback
+     */
+    void changePeers(final Configuration newPeers, final Closure done);
+
+    /**
+     * Reset the configuration of this node individually, without any replication to other peers before this node
+     * becomes the leader. This function is supposed to be invoked when the majority of the replication group are dead
+     * and you'd like to revive the service in the consideration of availability. Notice that neither consistency nor
+     * consensus are guaranteed in this case, BE CAREFULE when dealing with this method.
+     *
+     * @param newPeers new peers
+     */
+    Status resetPeers(final Configuration newPeers);
+
+    /**
+     * Add some new learners to the raft group. done.run() will be invoked after this operation finishes, describing the
+     * detailed result.
+     *
+     * @param learners learners to add
+     * @param done callback
+     */
+    void addLearners(final List<PeerId> learners, final Closure done);
+
+    /**
+     * Remove some learners from the raft group. done.run() will be invoked after this operation finishes, describing
+     * the detailed result.
+     *
+     * @param learners learners to remove
+     * @param done callback
+     */
+    void removeLearners(final List<PeerId> learners, final Closure done);
+
+    /**
+     * Reset learners in the raft group. done.run() will be invoked after this operation finishes, describing the
+     * detailed result.
+     *
+     * @param learners learners to set
+     * @param done callback
+     */
+    void resetLearners(final List<PeerId> learners, final Closure done);
+
+    /**
+     * Start a snapshot immediately if possible. done.run() would be invoked when the snapshot finishes, describing the
+     * detailed result.
+     *
+     * @param done callback
+     */
+    void snapshot(final Closure done);
+
+    /**
+     * Reset the election_timeout for the every node.
+     *
+     * @param electionTimeoutMs the timeout millis of election
+     */
+    void resetElectionTimeoutMs(final int electionTimeoutMs);
+
+    /**
+     * Try transferring leadership to |peer|. If peer is ANY_PEER, a proper follower will be chosen as the leader for
+     * the next term. Returns 0 on success, -1 otherwise.
+     *
+     * @param peer the target peer of new leader
+     * @return operation status
+     */
+    Status transferLeadershipTo(final PeerId peer);
+
+    /**
+     * Read the first committed user log from the given index. Return OK on success and user_log is assigned with the
+     * very data. Be awared that the user_log may be not the exact log at the given index, but the first available user
+     * log from the given index to lastCommittedIndex. Otherwise, appropriate errors are returned: - return ELOGDELETED
+     * when the log has been deleted; - return ENOMOREUSERLOG when we can't get a user log even reaching
+     * lastCommittedIndex. [NOTE] in consideration of safety, we use lastAppliedIndex instead of lastCommittedIndex in
+     * code implementation.
+     *
+     * @param index log index
+     * @return user log entry
+     * @throws LogNotFoundException the user log is deleted at index.
+     * @throws LogIndexOutOfBoundsException the special index is out of bounds.
+     */
+    UserLog readCommittedUserLog(final long index);
+
+    /**
+     * SOFAJRaft users can implement the ReplicatorStateListener interface by themselves. So users can do their own

Review comment:
       SOFAJRaft? 

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/server/impl/JRaftServerImpl.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.server.impl;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.ElectionPriority;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Iterator;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.JDKMarshaller;
+import org.apache.ignite.raft.server.RaftServer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class JRaftServerImpl implements RaftServer {
+    private final ClusterService service;
+
+    private final String dataPath;
+
+    private IgniteRpcServer rpcServer;
+
+    private ConcurrentMap<String, RaftGroupService> groups = new ConcurrentHashMap<>();
+
+    private final NodeManager nodeManager;
+
+    private final NodeOptions opts;
+
+    /**
+     * @param service Cluster service.
+     * @param dataPath Data path.
+     * @param factory The factory.
+     * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
+     */
+    public JRaftServerImpl(ClusterService service, String dataPath, RaftClientMessagesFactory factory, boolean reuse) {
+        this(service, dataPath, factory, reuse, new NodeOptions());
+    }
+
+    /**
+     * @param service Cluster service.
+     * @param dataPath Data path.
+     * @param factory The factory.
+     * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
+     * @param opts Default node options.
+     */
+    public JRaftServerImpl(
+        ClusterService service,
+        String dataPath,
+        RaftClientMessagesFactory factory,
+        boolean reuse,
+        NodeOptions opts
+    ) {
+        this.service = service;
+        this.dataPath = dataPath;
+        this.nodeManager = new NodeManager();
+        this.opts = opts;
+
+        assert !reuse || service.topologyService().localMember() != null;
+
+        // Use consistent id as server name.
+        if (opts.getServerName() == null)
+            opts.setServerName(service.localConfiguration().getName());
+
+        if (opts.getCommonExecutor() == null)
+            opts.setCommonExecutor(JRaftUtils.createCommonExecutor(opts));
+
+        if (opts.getStripedExecutor() == null)
+            opts.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(opts));
+
+        if (opts.getScheduler() == null)
+            opts.setScheduler(JRaftUtils.createScheduler(opts));
+
+        if (opts.getClientExecutor() == null)
+            opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
+
+        rpcServer = new IgniteRpcServer(service, reuse, nodeManager, factory, JRaftUtils.createRequestExecutor(opts));
+
+        rpcServer.init(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterService clusterService() {
+        return service;
+    }
+
+    /**
+     * @param groupId Group id.
+     * @return The path to persistence folder.
+     */
+    public String getServerDataPath(String groupId) {
+        ClusterNode clusterNode = service.topologyService().localMember();
+
+        Endpoint endpoint = new Endpoint(clusterNode.host(), clusterNode.port());
+
+        return this.dataPath + File.separator + groupId + "_" + endpoint.toString().replace(':', '_');
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean startRaftGroup(String groupId, RaftGroupListener lsnr,
+        @Nullable List<Peer> initialConf) {
+        if (groups.containsKey(groupId))
+            return false;
+
+        // Thread pools are shared by all raft groups.
+        final NodeOptions nodeOptions = opts.copy();
+
+        ClusterNode clusterNode = service.topologyService().localMember();
+        Endpoint endpoint = new Endpoint(clusterNode.host(), clusterNode.port());
+
+        final String serverDataPath = getServerDataPath(groupId);
+        new File(serverDataPath).mkdirs();
+
+        nodeOptions.setLogUri(serverDataPath + File.separator + "logs");
+        nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
+        nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
+
+        nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
+
+        if (initialConf != null) {
+            List<PeerId> mapped = initialConf.stream().map(p -> {
+                return PeerId.fromPeer(p);
+            }).collect(Collectors.toList());
+
+            nodeOptions.setInitialConf(new Configuration(mapped, null));
+        }
+
+        IgniteRpcClient client = new IgniteRpcClient(service, true);
+
+        nodeOptions.setRpcClient(client);
+
+        final RaftGroupService server = new RaftGroupService(groupId, new PeerId(endpoint, 0,
+            ElectionPriority.DISABLED), nodeOptions, rpcServer, nodeManager, true);
+
+        server.start(false);
+
+        groups.put(groupId, server);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopRaftGroup(String groupId) {
+        RaftGroupService svc = groups.remove(groupId);
+
+        boolean stopped = svc != null;
+
+        if (stopped)
+            svc.shutdown();
+
+        return stopped;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Peer localPeer(String groupId) {
+        RaftGroupService service = groups.get(groupId);
+
+        if (service == null)
+            return null;
+
+        PeerId peerId = service.getRaftNode().getNodeId().getPeerId();
+
+        return new Peer(peerId.getEndpoint().toString(), peerId.getPriority());
+    }
+
+    /**
+     * @param groupId Group id.
+     * @return Service group.
+     */
+    public RaftGroupService raftGroupService(String groupId) {
+        return groups.get(groupId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void shutdown() throws Exception {
+        for (RaftGroupService groupService : groups.values())
+            groupService.shutdown();
+
+        rpcServer.shutdown();
+    }
+
+    /**
+     *

Review comment:
       Please add javadoc here 

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/server/impl/JRaftServerImpl.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.server.impl;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.ElectionPriority;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Iterator;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.JDKMarshaller;
+import org.apache.ignite.raft.server.RaftServer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *

Review comment:
       We definitely should have proper javadocs for this file. Please add here and for class variables 

##########
File path: modules/raft/pom.xml
##########
@@ -77,6 +123,15 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
             </plugin>
+            <!-- Disable javadoc validataion for forked foreign module. -->

Review comment:
       validation

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/server/impl/JRaftServerImpl.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.server.impl;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.ElectionPriority;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Iterator;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.JDKMarshaller;
+import org.apache.ignite.raft.server.RaftServer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class JRaftServerImpl implements RaftServer {
+    private final ClusterService service;
+
+    private final String dataPath;
+
+    private IgniteRpcServer rpcServer;
+
+    private ConcurrentMap<String, RaftGroupService> groups = new ConcurrentHashMap<>();
+
+    private final NodeManager nodeManager;
+
+    private final NodeOptions opts;
+
+    /**
+     * @param service Cluster service.
+     * @param dataPath Data path.
+     * @param factory The factory.
+     * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
+     */
+    public JRaftServerImpl(ClusterService service, String dataPath, RaftClientMessagesFactory factory, boolean reuse) {
+        this(service, dataPath, factory, reuse, new NodeOptions());
+    }
+
+    /**
+     * @param service Cluster service.
+     * @param dataPath Data path.
+     * @param factory The factory.
+     * @param reuse {@code True} to reuse cluster service (do not manage lifecyle)
+     * @param opts Default node options.
+     */
+    public JRaftServerImpl(
+        ClusterService service,
+        String dataPath,
+        RaftClientMessagesFactory factory,
+        boolean reuse,
+        NodeOptions opts
+    ) {
+        this.service = service;
+        this.dataPath = dataPath;
+        this.nodeManager = new NodeManager();
+        this.opts = opts;
+
+        assert !reuse || service.topologyService().localMember() != null;
+
+        // Use consistent id as server name.

Review comment:
       I didn't get this comment. Is this todo? 

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/Status.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft;
+
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.Copiable;
+
+//A Status encapsulates the result of an operation. It may indicate success,
+
+//or it may indicate an error with an associated error message. It's suitable
+//for passing status of functions with richer information than just error_code
+//in exception-forbidden code. This utility is inspired by leveldb::Status.
+//
+//Multiple threads can invoke const methods on a Status without
+//external synchronization, but if any of the threads may call a
+//non-const method, all threads accessing the same Status must use
+//external synchronization.
+//
+//Since failed status needs to allocate memory, you should be careful when
+//failed status is frequent.

Review comment:
       please use another type of java comment

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
##########
@@ -24,12 +24,9 @@
 import org.jetbrains.annotations.Nullable;
 
 /**
- * The RAFT protocol based replication server.
- * <p>
- * Supports multiple RAFT groups.
- * <p>
- * The server listens for client commands, submits them to a replicated log and calls {@link RaftGroupListener}
- * {@code onRead} and {@code onWrite} methods then after the command was committed to the log.
+ * The RAFT protocol based replication server. * Supports multiple RAFT groups. * The server listens for client

Review comment:
       javadoc formatting is broken

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/HashedWheelTimer.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.ignite.raft.jraft.util.timer;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <h3>Implementation Details</h3>
+ * * {@link HashedWheelTimer} is based on
+ * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
+ * Tony Lauck's paper,
+ * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
+ * and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility'</a>.  More comprehensive
+ * slides are located
+ * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
+ * * Forked from <a href="https://github.com/netty/netty">Netty</a>.
+ */
+public class HashedWheelTimer implements Timer {
+    private static final Logger LOG = LoggerFactory.getLogger(HashedWheelTimer.class);
+
+    private static final int INSTANCE_COUNT_LIMIT = 256;
+    private static final AtomicInteger instanceCounter = new AtomicInteger();
+    private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean();
+
+    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = AtomicIntegerFieldUpdater
+        .newUpdater(HashedWheelTimer.class, "workerState");
+
+    private final Worker worker = new Worker();
+    private final Thread workerThread;
+
+    public static final int WORKER_STATE_INIT = 0;
+    public static final int WORKER_STATE_STARTED = 1;
+    public static final int WORKER_STATE_SHUTDOWN = 2;
+
+    private volatile int workerState; // 0 - init, 1 - started, 2 - shut down NOPMD
+
+    private final long tickDuration;
+    private final HashedWheelBucket[] wheel;
+    private final int mask;
+    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
+    private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<>();
+    private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedQueue<>();
+    private final AtomicLong pendingTimeouts = new AtomicLong(0);
+    private final long maxPendingTimeouts;
+
+    private volatile long startTime;
+
+    /**
+     * Creates a new timer with the default thread factory ({@link Executors#defaultThreadFactory()}), default tick
+     * duration, and default number of ticks per wheel.
+     */
+    public HashedWheelTimer() {
+        this(Executors.defaultThreadFactory());
+    }
+
+    /**
+     * Creates a new timer with the default thread factory ({@link Executors#defaultThreadFactory()}) and default number
+     * of ticks per wheel.
+     *
+     * @param tickDuration the duration between tick
+     * @param unit the time unit of the {@code tickDuration}
+     * @throws NullPointerException if {@code unit} is {@code null}
+     * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
+     */
+    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
+        this(Executors.defaultThreadFactory(), tickDuration, unit);
+    }
+
+    /**
+     * Creates a new timer with the default thread factory ({@link Executors#defaultThreadFactory()}).
+     *
+     * @param tickDuration the duration between tick
+     * @param unit the time unit of the {@code tickDuration}
+     * @param ticksPerWheel the size of the wheel
+     * @throws NullPointerException if {@code unit} is {@code null}
+     * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
+     */
+    public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
+        this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
+    }
+
+    /**
+     * Creates a new timer with the default tick duration and default number of ticks per wheel.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a background {@link Thread} which is dedicated to
+     * {@link TimerTask} execution.
+     * @throws NullPointerException if {@code threadFactory} is {@code null}
+     */
+    public HashedWheelTimer(ThreadFactory threadFactory) {
+        this(threadFactory, 100, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a new timer with the default number of ticks per wheel.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a background {@link Thread} which is dedicated to
+     * {@link TimerTask} execution.
+     * @param tickDuration the duration between tick
+     * @param unit the time unit of the {@code tickDuration}
+     * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+     * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
+     */
+    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
+        this(threadFactory, tickDuration, unit, 512);
+    }
+
+    /**
+     * Creates a new timer.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a background {@link Thread} which is dedicated to
+     * {@link TimerTask} execution.
+     * @param tickDuration the duration between tick
+     * @param unit the time unit of the {@code tickDuration}
+     * @param ticksPerWheel the size of the wheel
+     * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+     * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
+     */
+    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
+        this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
+    }
+
+    /**
+     * Creates a new timer.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a background {@link Thread} which is dedicated to
+     * {@link TimerTask} execution.
+     * @param tickDuration the duration between tick
+     * @param unit the time unit of the {@code tickDuration}
+     * @param ticksPerWheel the size of the wheel
+     * @param maxPendingTimeouts The maximum number of pending timeouts after which call to {@code newTimeout} will
+     * result in {@link RejectedExecutionException} being thrown. No maximum pending timeouts limit is assumed if this
+     * value is 0 or negative.
+     * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+     * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
+     */
+    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
+        long maxPendingTimeouts) {
+
+        if (threadFactory == null) {
+            throw new NullPointerException("threadFactory");
+        }
+        if (unit == null) {
+            throw new NullPointerException("unit");
+        }
+        if (tickDuration <= 0) {
+            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
+        }
+        if (ticksPerWheel <= 0) {
+            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
+        }
+
+        // Normalize ticksPerWheel to power of two and initialize the wheel.
+        wheel = createWheel(ticksPerWheel);
+        mask = wheel.length - 1;
+
+        // Convert tickDuration to nanos.
+        this.tickDuration = unit.toNanos(tickDuration);
+
+        // Prevent overflow.
+        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
+            throw new IllegalArgumentException(String.format(
+                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
+                    / wheel.length));
+        }
+        workerThread = threadFactory.newThread(worker);
+
+        this.maxPendingTimeouts = maxPendingTimeouts;
+
+        if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
+            && warnedTooManyInstances.compareAndSet(false, true)) {
+            reportTooManyInstances();
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        try {
+            super.finalize();
+        }
+        finally {
+            // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
+            // we have not yet shutdown then we want to make sure we decrement the active instance count.
+            if (workerStateUpdater.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
+                instanceCounter.decrementAndGet();
+            }
+        }
+    }
+
+    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
+        if (ticksPerWheel <= 0) {
+            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
+        }
+        if (ticksPerWheel > 1073741824) {
+            throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
+        }
+
+        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
+        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
+        for (int i = 0; i < wheel.length; i++) {
+            wheel[i] = new HashedWheelBucket();
+        }
+        return wheel;
+    }
+
+    private static int normalizeTicksPerWheel(int ticksPerWheel) {
+        int normalizedTicksPerWheel = 1;
+        while (normalizedTicksPerWheel < ticksPerWheel) {
+            normalizedTicksPerWheel <<= 1;
+        }
+        return normalizedTicksPerWheel;
+    }
+
+    /**
+     * Starts the background thread explicitly.  The background thread will start automatically on demand even if you
+     * did not call this method.
+     *
+     * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
+     */
+    public void start() {
+        switch (workerStateUpdater.get(this)) {
+            case WORKER_STATE_INIT:
+                if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
+                    workerThread.start();
+                }
+                break;
+            case WORKER_STATE_STARTED:
+                break;
+            case WORKER_STATE_SHUTDOWN:
+                throw new IllegalStateException("cannot be started once stopped");
+            default:
+                throw new Error("Invalid WorkerState");
+        }
+
+        // Wait until the startTime is initialized by the worker.
+        while (startTime == 0) {
+            try {
+                startTimeInitialized.await();
+            }
+            catch (InterruptedException ignore) {
+                // Ignore - it will be ready very soon.
+            }
+        }
+    }
+
+    @Override
+    public Set<Timeout> stop() {
+        if (Thread.currentThread() == workerThread) {
+            throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
+                + TimerTask.class.getSimpleName());
+        }
+
+        if (!workerStateUpdater.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
+            // workerState can be 0 or 2 at this moment - let it always be 2.
+            if (workerStateUpdater.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
+                instanceCounter.decrementAndGet();
+            }
+
+            return Collections.emptySet();
+        }
+
+        try {
+            boolean interrupted = false;
+            while (workerThread.isAlive()) {
+                workerThread.interrupt();
+                try {
+                    workerThread.join(100);
+                }
+                catch (InterruptedException ignored) {
+                    interrupted = true;
+                }
+            }
+
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        finally {
+            instanceCounter.decrementAndGet();
+        }
+        return worker.unprocessedTimeouts();
+    }
+
+    @Override
+    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
+        if (task == null) {
+            throw new NullPointerException("task");
+        }
+        if (unit == null) {
+            throw new NullPointerException("unit");
+        }
+
+        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
+
+        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
+            pendingTimeouts.decrementAndGet();
+            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
+                + ") is greater than or equal to maximum allowed pending "
+                + "timeouts (" + maxPendingTimeouts + ")");
+        }
+
+        start();
+
+        // Add the timeout to the timeout queue which will be processed on the next tick.
+        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
+        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
+
+        // Guard against overflow.
+        if (delay > 0 && deadline < 0) {
+            deadline = Long.MAX_VALUE;
+        }
+        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
+        timeouts.add(timeout);
+        return timeout;
+    }
+
+    /**
+     * Returns the number of pending timeouts of this {@link Timer}.
+     */
+    public long pendingTimeouts() {
+        return pendingTimeouts.get();
+    }
+
+    private static void reportTooManyInstances() {
+        String resourceType = HashedWheelTimer.class.getSimpleName();
+        LOG.error("You are creating too many {} instances.  {} is a shared resource that must be "
+            + "reused across the JVM, so that only a few instances are created.", resourceType, resourceType);
+    }
+
+    private final class Worker implements Runnable {
+        private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
+
+        private long tick;
+
+        @Override
+        public void run() {
+            // Initialize the startTime.
+            startTime = System.nanoTime();
+            if (startTime == 0) {
+                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
+                startTime = 1;
+            }
+
+            // Notify the other threads waiting for the initialization at start().
+            startTimeInitialized.countDown();
+
+            do {
+                final long deadline = waitForNextTick();
+                if (deadline > 0) {
+                    int idx = (int) (tick & mask);
+                    processCancelledTasks();
+                    HashedWheelBucket bucket = wheel[idx];
+                    transferTimeoutsToBuckets();
+                    bucket.expireTimeouts(deadline);
+                    tick++;
+                }
+            }
+            while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
+
+            // Fill the unprocessedTimeouts so we can return them from stop() method.
+            for (HashedWheelBucket bucket : wheel) {
+                bucket.clearTimeouts(unprocessedTimeouts);
+            }
+            for (; ; ) {
+                HashedWheelTimeout timeout = timeouts.poll();
+                if (timeout == null) {
+                    break;
+                }
+                if (!timeout.isCancelled()) {
+                    unprocessedTimeouts.add(timeout);
+                }
+            }
+            processCancelledTasks();
+        }
+
+        private void transferTimeoutsToBuckets() {
+            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
+            // adds new timeouts in a loop.
+            for (int i = 0; i < 100000; i++) {
+                HashedWheelTimeout timeout = timeouts.poll();
+                if (timeout == null) {
+                    // all processed
+                    break;
+                }
+                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
+                    // Was cancelled in the meantime.
+                    continue;
+                }
+
+                long calculated = timeout.deadline / tickDuration;
+                timeout.remainingRounds = (calculated - tick) / wheel.length;
+
+                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
+                int stopIndex = (int) (ticks & mask);
+
+                HashedWheelBucket bucket = wheel[stopIndex];
+                bucket.addTimeout(timeout);
+            }
+        }
+
+        private void processCancelledTasks() {
+            for (; ; ) {
+                HashedWheelTimeout timeout = cancelledTimeouts.poll();
+                if (timeout == null) {
+                    // all processed
+                    break;
+                }
+                try {
+                    timeout.remove();
+                }
+                catch (Throwable t) {
+                    if (LOG.isWarnEnabled()) {
+                        LOG.warn("An exception was thrown while process a cancellation task", t);
+                    }
+                }
+            }
+        }
+
+        /**
+         * Calculate goal nanoTime from startTime and current tick number, then wait until that goal has been reached.
+         *
+         * @return Long.MIN_VALUE if received a shutdown request, current time otherwise (with Long.MIN_VALUE changed by
+         * +1)
+         */
+        private long waitForNextTick() {
+            long deadline = tickDuration * (tick + 1);
+
+            for (; ; ) {
+                final long currentTime = System.nanoTime() - startTime;
+                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
+
+                if (sleepTimeMs <= 0) {
+                    if (currentTime == Long.MIN_VALUE) {
+                        return -Long.MAX_VALUE;
+                    }
+                    else {
+                        return currentTime;
+                    }
+                }
+
+                // We decide to remove the original approach (as below) which used in netty for
+                // windows platform.
+                // See https://github.com/netty/netty/issues/356
+                //
+                // if (Platform.isWindows()) {
+                //     sleepTimeMs = sleepTimeMs / 10 * 10;
+                // }
+                //
+                // The above approach that make sleepTimes to be a multiple of 10ms will
+                // lead to severe spin in this loop for several milliseconds, which
+                // causes the high CPU usage.
+                // See https://github.com/sofastack/sofa-jraft/issues/311
+                //
+                // According to the regression testing on windows, we haven't reproduced the
+                // Thread.sleep() bug referenced in https://www.javamex.com/tutorials/threads/sleep_issues.shtml
+                // yet.
+                //
+                // The regression testing environment:
+                // - SOFAJRaft version: 1.2.6

Review comment:
       SOFAJRaft? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org