You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/10 17:09:13 UTC

[iotdb] 02/02: add lost codes..

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2fbbdc9d8c5f8348de38316601d3ad3f14db8256
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Aug 11 01:06:08 2021 +0800

    add lost codes..
---
 .../server/raft/AbstractDataRaftService.java       | 53 ++++++++++++++++
 .../server/raft/AbstractMetaRaftService.java       | 51 +++++++++++++++
 .../cluster/server/raft/AbstractRaftService.java   | 74 ++++++++++++++++++++++
 .../server/raft/DataRaftHeartBeatService.java      | 67 ++++++++++++++++++++
 .../iotdb/cluster/server/raft/DataRaftService.java | 64 +++++++++++++++++++
 .../server/raft/MetaRaftHeartBeatService.java      | 67 ++++++++++++++++++++
 .../iotdb/cluster/server/raft/MetaRaftService.java | 64 +++++++++++++++++++
 7 files changed, 440 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java
new file mode 100644
index 0000000..23fcc59
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+
+public abstract class AbstractDataRaftService extends AbstractRaftService {
+
+  private DataGroupServiceImpls impl;
+
+  @Override
+  public void initSyncedServiceImpl(Object serviceImpl) {
+    if (impl != null) {
+      impl = (DataGroupServiceImpls) serviceImpl;
+    }
+    super.initSyncedServiceImpl(serviceImpl);
+  }
+
+  @Override
+  public void initAsyncedServiceImpl(Object serviceImpl) {
+    if (impl != null) {
+      impl = (DataGroupServiceImpls) serviceImpl;
+    }
+    super.initAsyncedServiceImpl(serviceImpl);
+  }
+
+  @Override
+  public void initTProcessor() {
+    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+      processor = new TSDataService.AsyncProcessor<>(impl);
+    } else {
+      processor = new TSDataService.Processor<>(impl);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java
new file mode 100644
index 0000000..905e0fa
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+
+public abstract class AbstractMetaRaftService extends AbstractRaftService {
+
+  private MetaAsyncService asyncServiceImpl;
+  private MetaSyncService syncServiceImpl;
+
+  @Override
+  public void initSyncedServiceImpl(Object serviceImpl) {
+    syncServiceImpl = (MetaSyncService) serviceImpl;
+    super.initSyncedServiceImpl(serviceImpl);
+  }
+
+  @Override
+  public void initAsyncedServiceImpl(Object serviceImpl) {
+    asyncServiceImpl = (MetaAsyncService) serviceImpl;
+    super.initAsyncedServiceImpl(serviceImpl);
+  }
+
+  @Override
+  public void initTProcessor() {
+    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+      processor = new TSMetaService.AsyncProcessor<>(asyncServiceImpl);
+    } else {
+      processor = new TSMetaService.Processor<>(syncServiceImpl);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
new file mode 100644
index 0000000..d62024b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public abstract class AbstractRaftService extends ThriftService {
+
+  public void initThriftServiceThread(
+      String daemonThreadName, String clientThreadName, ThriftServiceThread.ServerType serverType)
+      throws IllegalAccessException {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    try {
+      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+        thriftServiceThread =
+            new ThriftServiceThread(
+                (TSMetaService.AsyncProcessor) processor,
+                getID().getName(),
+                clientThreadName,
+                getBindIP(),
+                getBindPort(),
+                config.getRpcMaxConcurrentClientNum(),
+                config.getThriftServerAwaitTimeForStopService(),
+                new RaftServiceHandler(),
+                false,
+                ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(),
+                config.getThriftMaxFrameSize(),
+                serverType);
+      } else {
+        thriftServiceThread =
+            new ThriftServiceThread(
+                processor,
+                getID().getName(),
+                clientThreadName,
+                getBindIP(),
+                getBindPort(),
+                config.getRpcMaxConcurrentClientNum(),
+                config.getThriftServerAwaitTimeForStopService(),
+                new RaftServiceHandler(),
+                false);
+      }
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    thriftServiceThread.setName(daemonThreadName);
+  }
+
+  @Override
+  public String getBindIP() {
+    return ClusterDescriptor.getInstance().getConfig().getInternalIp();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java
new file mode 100644
index 0000000..54a66b0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class DataRaftHeartBeatService extends AbstractDataRaftService
+    implements DataRaftHeartBeatServiceMBean {
+
+  private DataRaftHeartBeatService() {}
+
+  @Override
+  public ThriftService getImplementation() {
+    return DataRaftHeartBeatServiceHolder.INSTANCE;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_DATA_HEART_BEAT_RPC_SERVICE;
+  }
+
+  @Override
+  public void initThriftServiceThread() throws IllegalAccessException {
+    initThriftServiceThread(
+        ThreadName.CLUSTER_DATA_HEARTBEAT_RPC_SERVICE.getName(),
+        ThreadName.CLUSTER_DATA_HEARTBEAT_RPC_CLIENT.getName(),
+        ThriftServiceThread.ServerType.HSHA);
+  }
+
+  @Override
+  public int getBindPort() {
+    return ClusterDescriptor.getInstance().getConfig().getInternalDataPort()
+        + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+  }
+
+  public static DataRaftHeartBeatService getInstance() {
+    return DataRaftHeartBeatServiceHolder.INSTANCE;
+  }
+
+  private static class DataRaftHeartBeatServiceHolder {
+
+    private static final DataRaftHeartBeatService INSTANCE = new DataRaftHeartBeatService();
+
+    private DataRaftHeartBeatServiceHolder() {}
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java
new file mode 100644
index 0000000..01cde6e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class DataRaftService extends AbstractDataRaftService implements DataRaftServiceMBean {
+
+  private DataRaftService() {}
+
+  @Override
+  public ThriftService getImplementation() {
+    return DataRaftServiceHolder.INSTANCE;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_DATA_RPC_SERVICE;
+  }
+
+  @Override
+  public void initThriftServiceThread() throws IllegalAccessException {
+    initThriftServiceThread(
+        ThreadName.CLUSTER_DATA_RPC_SERVICE.getName(),
+        ThreadName.CLUSTER_DATA_RPC_CLIENT.getName(),
+        ThriftServiceThread.ServerType.SELECTOR);
+  }
+
+  @Override
+  public int getBindPort() {
+    return ClusterDescriptor.getInstance().getConfig().getInternalDataPort();
+  }
+
+  public static DataRaftService getInstance() {
+    return DataRaftServiceHolder.INSTANCE;
+  }
+
+  private static class DataRaftServiceHolder {
+
+    private static final DataRaftService INSTANCE = new DataRaftService();
+
+    private DataRaftServiceHolder() {}
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java
new file mode 100644
index 0000000..fc463c4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class MetaRaftHeartBeatService extends AbstractMetaRaftService
+    implements MetaRaftHeartBeatServiceMBean {
+
+  private MetaRaftHeartBeatService() {}
+
+  @Override
+  public ThriftService getImplementation() {
+    return MetaRaftHeartBeatServiceHolder.INSTANCE;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_META_HEART_BEAT_RPC_SERVICE;
+  }
+
+  @Override
+  public void initThriftServiceThread() throws IllegalAccessException {
+    initThriftServiceThread(
+        ThreadName.CLUSTER_META_HEARTBEAT_RPC_SERVICE.getName(),
+        ThreadName.CLUSTER_META_HEARTBEAT_RPC_CLIENT.getName(),
+        ThriftServiceThread.ServerType.HSHA);
+  }
+
+  @Override
+  public int getBindPort() {
+    return ClusterDescriptor.getInstance().getConfig().getInternalMetaPort()
+        + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+  }
+
+  public static MetaRaftHeartBeatService getInstance() {
+    return MetaRaftHeartBeatServiceHolder.INSTANCE;
+  }
+
+  private static class MetaRaftHeartBeatServiceHolder {
+
+    private static final MetaRaftHeartBeatService INSTANCE = new MetaRaftHeartBeatService();
+
+    private MetaRaftHeartBeatServiceHolder() {}
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java
new file mode 100644
index 0000000..d67f04d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class MetaRaftService extends AbstractMetaRaftService implements MetaRaftServiceMBean {
+
+  private MetaRaftService() {}
+
+  @Override
+  public ThriftService getImplementation() {
+    return MetaRaftServiceHolder.INSTANCE;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_META_RPC_SERVICE;
+  }
+
+  @Override
+  public void initThriftServiceThread() throws IllegalAccessException {
+    initThriftServiceThread(
+        ThreadName.CLUSTER_META_RPC_SERVICE.getName(),
+        ThreadName.CLUSTER_META_RPC_CLIENT.getName(),
+        ThriftServiceThread.ServerType.SELECTOR);
+  }
+
+  @Override
+  public int getBindPort() {
+    return ClusterDescriptor.getInstance().getConfig().getInternalMetaPort();
+  }
+
+  public static MetaRaftService getInstance() {
+    return MetaRaftServiceHolder.INSTANCE;
+  }
+
+  private static class MetaRaftServiceHolder {
+
+    private static final MetaRaftService INSTANCE = new MetaRaftService();
+
+    private MetaRaftServiceHolder() {}
+  }
+}