You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/10 17:09:13 UTC
[iotdb] 02/02: add lost codes..
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2fbbdc9d8c5f8348de38316601d3ad3f14db8256
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Aug 11 01:06:08 2021 +0800
add lost codes..
---
.../server/raft/AbstractDataRaftService.java | 53 ++++++++++++++++
.../server/raft/AbstractMetaRaftService.java | 51 +++++++++++++++
.../cluster/server/raft/AbstractRaftService.java | 74 ++++++++++++++++++++++
.../server/raft/DataRaftHeartBeatService.java | 67 ++++++++++++++++++++
.../iotdb/cluster/server/raft/DataRaftService.java | 64 +++++++++++++++++++
.../server/raft/MetaRaftHeartBeatService.java | 67 ++++++++++++++++++++
.../iotdb/cluster/server/raft/MetaRaftService.java | 64 +++++++++++++++++++
7 files changed, 440 insertions(+)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java
new file mode 100644
index 0000000..23fcc59
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+
+public abstract class AbstractDataRaftService extends AbstractRaftService {
+
+ private DataGroupServiceImpls impl;
+
+ @Override
+ public void initSyncedServiceImpl(Object serviceImpl) {
+ if (impl != null) {
+ impl = (DataGroupServiceImpls) serviceImpl;
+ }
+ super.initSyncedServiceImpl(serviceImpl);
+ }
+
+ @Override
+ public void initAsyncedServiceImpl(Object serviceImpl) {
+ if (impl != null) {
+ impl = (DataGroupServiceImpls) serviceImpl;
+ }
+ super.initAsyncedServiceImpl(serviceImpl);
+ }
+
+ @Override
+ public void initTProcessor() {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ processor = new TSDataService.AsyncProcessor<>(impl);
+ } else {
+ processor = new TSDataService.Processor<>(impl);
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java
new file mode 100644
index 0000000..905e0fa
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+
+public abstract class AbstractMetaRaftService extends AbstractRaftService {
+
+ private MetaAsyncService asyncServiceImpl;
+ private MetaSyncService syncServiceImpl;
+
+ @Override
+ public void initSyncedServiceImpl(Object serviceImpl) {
+ syncServiceImpl = (MetaSyncService) serviceImpl;
+ super.initSyncedServiceImpl(serviceImpl);
+ }
+
+ @Override
+ public void initAsyncedServiceImpl(Object serviceImpl) {
+ asyncServiceImpl = (MetaAsyncService) serviceImpl;
+ super.initAsyncedServiceImpl(serviceImpl);
+ }
+
+ @Override
+ public void initTProcessor() {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ processor = new TSMetaService.AsyncProcessor<>(asyncServiceImpl);
+ } else {
+ processor = new TSMetaService.Processor<>(syncServiceImpl);
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
new file mode 100644
index 0000000..d62024b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public abstract class AbstractRaftService extends ThriftService {
+
+ public void initThriftServiceThread(
+ String daemonThreadName, String clientThreadName, ThriftServiceThread.ServerType serverType)
+ throws IllegalAccessException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ thriftServiceThread =
+ new ThriftServiceThread(
+ (TSMetaService.AsyncProcessor) processor,
+ getID().getName(),
+ clientThreadName,
+ getBindIP(),
+ getBindPort(),
+ config.getRpcMaxConcurrentClientNum(),
+ config.getThriftServerAwaitTimeForStopService(),
+ new RaftServiceHandler(),
+ false,
+ ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(),
+ config.getThriftMaxFrameSize(),
+ serverType);
+ } else {
+ thriftServiceThread =
+ new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ clientThreadName,
+ getBindIP(),
+ getBindPort(),
+ config.getRpcMaxConcurrentClientNum(),
+ config.getThriftServerAwaitTimeForStopService(),
+ new RaftServiceHandler(),
+ false);
+ }
+ } catch (RPCServiceException e) {
+ throw new IllegalAccessException(e.getMessage());
+ }
+ thriftServiceThread.setName(daemonThreadName);
+ }
+
+ @Override
+ public String getBindIP() {
+ return ClusterDescriptor.getInstance().getConfig().getInternalIp();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java
new file mode 100644
index 0000000..54a66b0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class DataRaftHeartBeatService extends AbstractDataRaftService
+ implements DataRaftHeartBeatServiceMBean {
+
+ private DataRaftHeartBeatService() {}
+
+ @Override
+ public ThriftService getImplementation() {
+ return DataRaftHeartBeatServiceHolder.INSTANCE;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.CLUSTER_DATA_HEART_BEAT_RPC_SERVICE;
+ }
+
+ @Override
+ public void initThriftServiceThread() throws IllegalAccessException {
+ initThriftServiceThread(
+ ThreadName.CLUSTER_DATA_HEARTBEAT_RPC_SERVICE.getName(),
+ ThreadName.CLUSTER_DATA_HEARTBEAT_RPC_CLIENT.getName(),
+ ThriftServiceThread.ServerType.HSHA);
+ }
+
+ @Override
+ public int getBindPort() {
+ return ClusterDescriptor.getInstance().getConfig().getInternalDataPort()
+ + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+ }
+
+ public static DataRaftHeartBeatService getInstance() {
+ return DataRaftHeartBeatServiceHolder.INSTANCE;
+ }
+
+ private static class DataRaftHeartBeatServiceHolder {
+
+ private static final DataRaftHeartBeatService INSTANCE = new DataRaftHeartBeatService();
+
+ private DataRaftHeartBeatServiceHolder() {}
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java
new file mode 100644
index 0000000..01cde6e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class DataRaftService extends AbstractDataRaftService implements DataRaftServiceMBean {
+
+ private DataRaftService() {}
+
+ @Override
+ public ThriftService getImplementation() {
+ return DataRaftServiceHolder.INSTANCE;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.CLUSTER_DATA_RPC_SERVICE;
+ }
+
+ @Override
+ public void initThriftServiceThread() throws IllegalAccessException {
+ initThriftServiceThread(
+ ThreadName.CLUSTER_DATA_RPC_SERVICE.getName(),
+ ThreadName.CLUSTER_DATA_RPC_CLIENT.getName(),
+ ThriftServiceThread.ServerType.SELECTOR);
+ }
+
+ @Override
+ public int getBindPort() {
+ return ClusterDescriptor.getInstance().getConfig().getInternalDataPort();
+ }
+
+ public static DataRaftService getInstance() {
+ return DataRaftServiceHolder.INSTANCE;
+ }
+
+ private static class DataRaftServiceHolder {
+
+ private static final DataRaftService INSTANCE = new DataRaftService();
+
+ private DataRaftServiceHolder() {}
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java
new file mode 100644
index 0000000..fc463c4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class MetaRaftHeartBeatService extends AbstractMetaRaftService
+ implements MetaRaftHeartBeatServiceMBean {
+
+ private MetaRaftHeartBeatService() {}
+
+ @Override
+ public ThriftService getImplementation() {
+ return MetaRaftHeartBeatServiceHolder.INSTANCE;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.CLUSTER_META_HEART_BEAT_RPC_SERVICE;
+ }
+
+ @Override
+ public void initThriftServiceThread() throws IllegalAccessException {
+ initThriftServiceThread(
+ ThreadName.CLUSTER_META_HEARTBEAT_RPC_SERVICE.getName(),
+ ThreadName.CLUSTER_META_HEARTBEAT_RPC_CLIENT.getName(),
+ ThriftServiceThread.ServerType.HSHA);
+ }
+
+ @Override
+ public int getBindPort() {
+ return ClusterDescriptor.getInstance().getConfig().getInternalMetaPort()
+ + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+ }
+
+ public static MetaRaftHeartBeatService getInstance() {
+ return MetaRaftHeartBeatServiceHolder.INSTANCE;
+ }
+
+ private static class MetaRaftHeartBeatServiceHolder {
+
+ private static final MetaRaftHeartBeatService INSTANCE = new MetaRaftHeartBeatService();
+
+ private MetaRaftHeartBeatServiceHolder() {}
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java
new file mode 100644
index 0000000..d67f04d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.raft;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class MetaRaftService extends AbstractMetaRaftService implements MetaRaftServiceMBean {
+
+ private MetaRaftService() {}
+
+ @Override
+ public ThriftService getImplementation() {
+ return MetaRaftServiceHolder.INSTANCE;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.CLUSTER_META_RPC_SERVICE;
+ }
+
+ @Override
+ public void initThriftServiceThread() throws IllegalAccessException {
+ initThriftServiceThread(
+ ThreadName.CLUSTER_META_RPC_SERVICE.getName(),
+ ThreadName.CLUSTER_META_RPC_CLIENT.getName(),
+ ThriftServiceThread.ServerType.SELECTOR);
+ }
+
+ @Override
+ public int getBindPort() {
+ return ClusterDescriptor.getInstance().getConfig().getInternalMetaPort();
+ }
+
+ public static MetaRaftService getInstance() {
+ return MetaRaftServiceHolder.INSTANCE;
+ }
+
+ private static class MetaRaftServiceHolder {
+
+ private static final MetaRaftService INSTANCE = new MetaRaftService();
+
+ private MetaRaftServiceHolder() {}
+ }
+}