You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hugegraph.apache.org by GitBox <gi...@apache.org> on 2022/04/25 08:06:21 UTC

[GitHub] [incubator-hugegraph] javeme commented on a diff in pull request #1527: Improve some action for install snapshot and add peer

javeme commented on code in PR #1527:
URL: https://github.com/apache/incubator-hugegraph/pull/1527#discussion_r857337462


##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftContext.java:
##########
@@ -104,26 +106,38 @@ public final class RaftSharedContext {
     private RaftGroupManager raftGroupManager;
     private RpcForwarder rpcForwarder;
 
-    public RaftSharedContext(HugeGraphParams params) {
+    public RaftContext(HugeGraphParams params, RpcServer rpcServer,
+                       PeerId endpoint) {
         this.params = params;
-        HugeConfig config = this.config();
+        this.rpcServer = rpcServer;
+        this.endpoint = endpoint;
 
+        HugeConfig config = params.configuration();
         this.schemaStoreName = config.get(CoreOptions.STORE_SCHEMA);
         this.graphStoreName = config.get(CoreOptions.STORE_GRAPH);
         this.systemStoreName = config.get(CoreOptions.STORE_SYSTEM);
         this.stores = new RaftBackendStore[StoreType.ALL.getNumber()];
-        this.rpcServer = this.initAndStartRpcServer();
+
+//        // TODO: 依赖了ServerOptions的配置项名,需要打通ServerConfig和CoreConfig

Review Comment:
   translate to English?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java:
##########
@@ -98,39 +104,52 @@ public void shutdown() {
         this.node.shutdown();
     }
 
-    public void snapshot() {
-        if (!this.context.useSnapshot()) {
-            return;
-        }
+    public RaftClosure<?> snapshot() {
         RaftClosure<?> future = new RaftClosure<>();
         try {
             this.node().snapshot(future);
-            future.waitFinished();
+            return future;
         } catch (Throwable e) {
             throw new BackendException("Failed to create snapshot", e);
         }
     }
 
-    private Node initRaftNode() throws IOException {
+    private RaftGroupService initRaftNode() throws IOException {
         NodeOptions nodeOptions = this.context.nodeOptions();
         nodeOptions.setFsm(this.stateMachine);
-        // TODO: When support sharding, groupId needs to be bound to shard Id
+        /*
+         * TODO: the groupId is same as graph name now, when support sharding,
+         *  groupId needs to be bound to shard Id
+         */
         String groupId = this.context.group();
         PeerId endpoint = this.context.endpoint();
         /*
          * Start raft node with shared rpc server:
-         * return new RaftGroupService(groupId, endpoint, nodeOptions,
-         *                             this.context.rpcServer(), true)
-         *        .start(false)
          */
-        return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint,
-                                                        nodeOptions);
+        RpcServer rpcServer = this.context.rpcServer();
+        LOG.info("The raft endpoint '{}', initial group peers [{}]",
+                 endpoint, nodeOptions.getInitialConf());
+        // Shared rpc server
+        return new RaftGroupService(groupId, endpoint, nodeOptions,
+                                    rpcServer, true);
+    }
+
+    public void close() {
+        if (this.raftGroupService != null) {
+            this.raftGroupService.shutdown();
+            try {
+                this.raftGroupService.join();
+            } catch (final InterruptedException e) {
+                throw new RaftException("Interrupted while shutdown " +
+                                                "raftGroupService");
+            }
+        }
     }
 
     private void submitCommand(StoreCommand command, RaftStoreClosure closure) {
         // Wait leader elected
         LeaderInfo leaderInfo = this.waitLeaderElected(
-                                RaftSharedContext.NO_TIMEOUT);
+                RaftContext.WAIT_LEADER_TIMEOUT);

Review Comment:
   can align with "this"?



##########
hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java:
##########
@@ -257,6 +286,45 @@ public void close() {
         this.unlistenChanges();
     }
 
+    private void loadGraph(HugeConfig serverConfig, String name, String path) {
+        HugeConfig config = new HugeConfig(path);
+        String raftGroupPeers = serverConfig.get(ServerOptions.RAFT_GROUP_PEERS);
+        config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), raftGroupPeers);
+
+        final Graph graph = GraphFactory.open(config);
+        this.graphs.put(name, graph);
+        LOG.info("Graph '{}' was successfully configured via '{}'", name, path);
+
+        if (this.requireAuthentication() &&
+            !(graph instanceof HugeGraphAuthProxy)) {
+            LOG.warn("You may need to support access control for '{}' with {}",
+                     path, HugeFactoryAuthProxy.GRAPH_FACTORY);
+        }
+    }
+
+//    private com.alipay.sofa.jraft.rpc.RpcServer startRaftRpcServer(HugeConfig

Review Comment:
   can delete unused code



##########
hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java:
##########
@@ -100,30 +106,53 @@ public GraphManager(HugeConfig conf, EventHub hub) {
         this.loadGraphs(ConfigUtil.scanGraphsDir(this.graphsDir));
         // this.installLicense(conf, "");
         // Raft will load snapshot firstly then launch election and replay log
-        this.waitGraphsStarted();
-        this.checkBackendVersionOrExit(conf);
         this.startRpcServer();
+
+        initAllSystemSchema();

Review Comment:
   this.initAllSystemSchema()



##########
hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java:
##########
@@ -257,6 +286,45 @@ public void close() {
         this.unlistenChanges();
     }
 
+    private void loadGraph(HugeConfig serverConfig, String name, String path) {

Review Comment:
   can move to line 401 to diff them



##########
hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java:
##########
@@ -100,30 +106,53 @@ public GraphManager(HugeConfig conf, EventHub hub) {
         this.loadGraphs(ConfigUtil.scanGraphsDir(this.graphsDir));
         // this.installLicense(conf, "");
         // Raft will load snapshot firstly then launch election and replay log
-        this.waitGraphsStarted();
-        this.checkBackendVersionOrExit(conf);
         this.startRpcServer();
+
+        initAllSystemSchema();
+
+        ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
+                                                              "serverConfig");
+        com.alipay.remoting.rpc.RpcServer remotingRpcServer;
+        remotingRpcServer = Whitebox.getInternalState(serverConfig.getServer(),
+                                                      "remotingServer");

Review Comment:
   add a method remotingRpcServer()



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java:
##########
@@ -45,22 +51,51 @@ public class RaftBackendStoreProvider implements BackendStoreProvider {
     private static final Logger LOG = Log.logger(RaftBackendStoreProvider.class);
 
     private final BackendStoreProvider provider;
-    private final RaftSharedContext context;
     private RaftBackendStore schemaStore;
     private RaftBackendStore graphStore;
     private RaftBackendStore systemStore;
+    private RaftContext context;
 
-    public RaftBackendStoreProvider(BackendStoreProvider provider,
-                                    HugeGraphParams params) {
+    public RaftBackendStoreProvider(BackendStoreProvider provider) {
         this.provider = provider;
-        this.context = new RaftSharedContext(params);
         this.schemaStore = null;
         this.graphStore = null;
         this.systemStore = null;
+        this.context = null;
+    }
+
+    public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {
+        HugeConfig config = params.configuration();
+        Integer lowWaterMark = config.get(
+                               CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
+        System.setProperty("bolt.channel_write_buf_low_water_mark",
+                           String.valueOf(lowWaterMark));
+        Integer highWaterMark = config.get(
+                                CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
+        System.setProperty("bolt.channel_write_buf_high_water_mark",
+                           String.valueOf(highWaterMark));
+
+//        PeerId endpoint = new PeerId();

Review Comment:
   add "TODO: xx" comment



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java:
##########
@@ -45,22 +51,51 @@ public class RaftBackendStoreProvider implements BackendStoreProvider {
     private static final Logger LOG = Log.logger(RaftBackendStoreProvider.class);
 
     private final BackendStoreProvider provider;
-    private final RaftSharedContext context;
     private RaftBackendStore schemaStore;
     private RaftBackendStore graphStore;
     private RaftBackendStore systemStore;
+    private RaftContext context;
 
-    public RaftBackendStoreProvider(BackendStoreProvider provider,
-                                    HugeGraphParams params) {
+    public RaftBackendStoreProvider(BackendStoreProvider provider) {
         this.provider = provider;
-        this.context = new RaftSharedContext(params);
         this.schemaStore = null;
         this.graphStore = null;
         this.systemStore = null;
+        this.context = null;
+    }
+
+    public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {
+        HugeConfig config = params.configuration();
+        Integer lowWaterMark = config.get(
+                               CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
+        System.setProperty("bolt.channel_write_buf_low_water_mark",
+                           String.valueOf(lowWaterMark));
+        Integer highWaterMark = config.get(
+                                CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
+        System.setProperty("bolt.channel_write_buf_high_water_mark",
+                           String.valueOf(highWaterMark));
+
+//        PeerId endpoint = new PeerId();
+//        String endpointStr = config.get(ServerOptions.RAFT_ENDPOINT);
+//        if (!endpoint.parse(endpointStr)) {
+//            throw new HugeException("Failed to parse endpoint %s", endpointStr);
+//        }
+
+        // Reference from RaftRpcServerFactory.createAndStartRaftRpcServer
+        com.alipay.sofa.jraft.rpc.RpcServer raftRpcServer =
+                                            new BoltRpcServer(rpcServer);
+        RaftRpcServerFactory.addRaftRequestProcessors(raftRpcServer);
+        raftRpcServer.init(null);
+
+        PeerId endpoint = new PeerId(rpcServer.ip(), rpcServer.port());
+        this.context = new RaftContext(params, raftRpcServer, endpoint);
+
+        //

Review Comment:
   unused?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java:
##########
@@ -197,7 +236,9 @@ public void initSystemInfo(HugeGraph graph) {
         BackendStoreSystemInfo info = graph.backendStoreSystemInfo();
         info.init();
 
-        this.notifyAndWaitEvent(Events.STORE_INITED);
+        // 创建系统schema,保存到pool里面

Review Comment:
   translate to English



##########
hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java:
##########
@@ -100,30 +106,53 @@ public GraphManager(HugeConfig conf, EventHub hub) {
         this.loadGraphs(ConfigUtil.scanGraphsDir(this.graphsDir));
         // this.installLicense(conf, "");
         // Raft will load snapshot firstly then launch election and replay log
-        this.waitGraphsStarted();
-        this.checkBackendVersionOrExit(conf);
         this.startRpcServer();
+
+        initAllSystemSchema();
+
+        ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
+                                                              "serverConfig");
+        com.alipay.remoting.rpc.RpcServer remotingRpcServer;
+        remotingRpcServer = Whitebox.getInternalState(serverConfig.getServer(),
+                                                      "remotingServer");
+        this.waitGraphsReady(remotingRpcServer);
+
+        this.checkBackendVersionOrExit(conf);
+
         this.serverStarted(conf);
         this.addMetrics(conf);
     }
 
-    public void loadGraphs(final Map<String, String> graphConfs) {
+    public {
+        //
+        register(-1, ~task, xx, xx, xx,);
+
+
+
+
+
+

Review Comment:
   need to clean code?



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java:
##########
@@ -197,7 +236,9 @@ public void initSystemInfo(HugeGraph graph) {
         BackendStoreSystemInfo info = graph.backendStoreSystemInfo();
         info.init();
 
-        this.notifyAndWaitEvent(Events.STORE_INITED);
+        // 创建系统schema,保存到pool里面
+        init();

Review Comment:
   this.init()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org