You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/05/11 01:47:46 UTC
[iotdb] branch master updated: [IOTDB-1352][IOTDB-1353] Rafactor
E2E test, add ClusterInfo thrift based API (#3143)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 19cfcd5 [IOTDB-1352][IOTDB-1353] Rafactor E2E test, add ClusterInfo thrift based API (#3143)
19cfcd5 is described below
commit 19cfcd5374a562a495c70b7b18e171a04df0fb41
Author: Xiangdong Huang <hx...@apache.org>
AuthorDate: Tue May 11 09:47:26 2021 +0800
[IOTDB-1352][IOTDB-1353] Rafactor E2E test, add ClusterInfo thrift based API (#3143)
---
.../resources/conf/iotdb-cluster.properties | 3 +
.../java/org/apache/iotdb/cluster/ClusterMain.java | 13 ++
.../apache/iotdb/cluster/config/ClusterConfig.java | 9 ++
.../iotdb/cluster/config/ClusterDescriptor.java | 9 +-
.../iotdb/cluster/server/MetaClusterServer.java | 28 ++--
.../server/clusterinfo/ClusterInfoServer.java | 94 +++++++++++++
.../server/clusterinfo/ClusterInfoServerMBean.java | 22 +++
.../server/clusterinfo/ClusterInfoServiceImpl.java | 71 ++++++++++
.../ClusterInfoServiceThriftHandler.java | 55 ++++++++
.../cluster/utils/nodetool/ClusterMonitor.java | 2 +
.../utils/nodetool/ClusterMonitorMBean.java | 2 +-
.../server/clusterinfo/ClusterInfoServerTest.java | 73 ++++++++++
.../clusterinfo/ClusterInfoServiceImplTest.java | 98 ++++++++++++++
.../cluster/server/member/MetaGroupMemberTest.java | 5 +-
docs/UserGuide/API/Programming-Java-Native-API.md | 98 ++++++++++++++
docs/UserGuide/Cluster/Cluster-Setup.md | 9 ++
.../UserGuide/API/Programming-Java-Native-API.md | 95 +++++++++++++
.../Administration-Management/Administration.md | 2 +-
docs/zh/UserGuide/Cluster/Cluster-Setup.md | 9 ++
.../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +-
.../org/apache/iotdb/db/service/ServiceType.java | 5 +-
.../test/java/org/apache/iotdb/db/sql/Cases.java | 124 +++++++++++++++++
.../java/org/apache/iotdb/db/sql/ClusterIT.java | 150 +++++++--------------
.../java/org/apache/iotdb/db/sql/SingleNodeIT.java | 92 ++-----------
.../iotdb/db/sql/node1/OneNodeClusterIT.java | 54 ++++++++
.../db/sql/nodes3/AbstractThreeNodeClusterIT.java | 61 +++++++++
.../iotdb/db/sql/nodes3/ThreeNodeCluster1IT.java | 23 ++++
.../iotdb/db/sql/nodes3/ThreeNodeCluster2IT.java | 32 +++++
.../db/sql/nodes5/AbstractFiveNodeClusterIT.java | 85 ++++++++++++
.../iotdb/db/sql/nodes5/FiveNodeCluster1IT.java | 23 ++++
.../iotdb/db/sql/nodes5/FiveNodeCluster2IT.java | 32 +++++
.../iotdb/db/sql/nodes5/FiveNodeCluster4IT.java | 32 +++++
.../test/resources/1node/iotdb-cluster.properties | 2 +-
thrift-cluster/src/main/thrift/cluster.thrift | 52 ++++++-
thrift-sync/pom.xml | 5 -
35 files changed, 1264 insertions(+), 208 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index dd0a257..ba626e3 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -38,6 +38,9 @@ internal_meta_port=9003
# port for data service
internal_data_port=40010
+# port for cluster info API, 6567 by default
+#cluster_info_public_port=6567
+
# whether open port for server module (for debug purpose)
# if true, the rpc_port of the single server will be changed to rpc_port (in iotdb-engines.properties) + 1
# open_server_rpc_port=false
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index dc3e767..a904838 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -29,12 +29,14 @@ import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.MetaClusterServer;
import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBConfigCheck;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
@@ -106,6 +108,9 @@ public class ClusterMain {
preStartCustomize();
metaServer.start();
metaServer.buildCluster();
+ // Currently, we do not register ClusterInfoService as a JMX Bean,
+ // so we use startService() rather than start()
+ ClusterInfoServer.getInstance().startService();
} catch (TTransportException
| StartupException
| QueryProcessException
@@ -120,6 +125,9 @@ public class ClusterMain {
preStartCustomize();
metaServer.start();
metaServer.joinCluster();
+ // Currently, we do not register ClusterInfoService as a JMX Bean,
+ // so we use startService() rather than start()
+ ClusterInfoServer.getInstance().startService();
} catch (TTransportException
| StartupException
| QueryProcessException
@@ -300,4 +308,9 @@ public class ClusterMain {
}
});
}
+
+ @TestOnly
+ public static void setMetaClusterServer(MetaClusterServer metaClusterServer) {
+ metaServer = metaClusterServer;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index a2bb2de..b9b0db4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -38,6 +38,7 @@ public class ClusterConfig {
private int internalMetaPort = 9003;
private int internalDataPort = 40010;
private int clusterRpcPort = IoTDBDescriptor.getInstance().getConfig().getRpcPort();
+ private int clusterInfoRpcPort = 6567;
/** each one is a {internalIp | domain name}:{meta port} string tuple. */
private List<String> seedNodeUrls;
@@ -486,4 +487,12 @@ public class ClusterConfig {
public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
this.waitClientTimeoutMS = waitClientTimeoutMS;
}
+
+ public int getClusterInfoRpcPort() {
+ return clusterInfoRpcPort;
+ }
+
+ public void setClusterInfoRpcPort(int clusterInfoRpcPort) {
+ this.clusterInfoRpcPort = clusterInfoRpcPort;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 25f85e0..801bbf1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -142,9 +142,14 @@ public class ClusterDescriptor {
properties.getProperty(
"internal_data_port", Integer.toString(config.getInternalDataPort()))));
- config.setClusterRpcPort(
+ // rpc port and rpc address are defined in iotdb-engine.properties.
+ // To avoid inconsistency, we do not read "rpc_port" in iotdb-cluster.properties
+ // even users claim the property.
+
+ config.setClusterInfoRpcPort(
Integer.parseInt(
- properties.getProperty("rpc_port", Integer.toString(config.getClusterRpcPort()))));
+ properties.getProperty(
+ "cluster_info_public_port", Integer.toString(config.getClusterInfoRpcPort()))));
config.setMaxConcurrentClientNum(
Integer.parseInt(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 12e286f..2ef9c34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -24,20 +24,7 @@ import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
-import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
-import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
-import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
-import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.rpc.thrift.*;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
@@ -49,6 +36,7 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
@@ -112,7 +100,9 @@ public class MetaClusterServer extends RaftServer
((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
ioTDB.active();
member.start();
+ // JMX based DBA API
registerManager.register(ClusterMonitor.INSTANCE);
+
}
/** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */
@@ -366,4 +356,14 @@ public class MetaClusterServer extends RaftServer
public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
asyncService.handshake(sender, resultHandler);
}
+
+ @TestOnly
+ public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
+ this.member = metaGroupMember;
+ }
+
+ @TestOnly
+ public IoTDB getIoTDB() {
+ return ioTDB;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java
new file mode 100644
index 0000000..bf08e7d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.clusterinfo;
+
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService.Processor;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
+public class ClusterInfoServer extends ThriftService implements ClusterInfoServerMBean {
+ private ClusterInfoServiceImpl serviceImpl;
+
+ public static ClusterInfoServer getInstance() {
+ return ClusterMonitorServiceHolder.INSTANCE;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.CLUSTER_INFO_SERVICE;
+ }
+
+ @Override
+ public ThriftService getImplementation() {
+ return getInstance();
+ }
+
+ @Override
+ public void initTProcessor() {
+ serviceImpl = new ClusterInfoServiceImpl();
+ processor = new Processor<>(serviceImpl);
+ }
+
+ @Override
+ public void initThriftServiceThread() throws IllegalAccessException {
+ ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+ IoTDBConfig nodeConfig = IoTDBDescriptor.getInstance().getConfig();
+ try {
+ thriftServiceThread =
+ new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.CLUSTER_INFO_SERVICE.getName(),
+ nodeConfig.getRpcAddress(),
+ clusterConfig.getClusterInfoRpcPort(),
+ nodeConfig.getRpcMaxConcurrentClientNum(),
+ nodeConfig.getThriftServerAwaitTimeForStopService(),
+ new ClusterInfoServiceThriftHandler(serviceImpl),
+ IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ } catch (RPCServiceException e) {
+ throw new IllegalAccessException(e.getMessage());
+ }
+ thriftServiceThread.setName(ThreadName.CLUSTER_INFO_SERVICE.getName() + "Service");
+ }
+
+ @Override
+ public String getBindIP() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+ }
+
+ @Override
+ public int getBindPort() {
+ return ClusterDescriptor.getInstance().getConfig().getClusterInfoRpcPort();
+ }
+
+ private static class ClusterMonitorServiceHolder {
+
+ private static final ClusterInfoServer INSTANCE = new ClusterInfoServer();
+
+ private ClusterMonitorServiceHolder() {}
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerMBean.java
new file mode 100644
index 0000000..1003d46
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerMBean.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.clusterinfo;
+
+public interface ClusterInfoServerMBean {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java
new file mode 100644
index 0000000..82cb1b0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.clusterinfo;
+
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService;
+import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+
+import org.apache.commons.collections4.map.MultiKeyMap;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ClusterInfoServiceImpl implements ClusterInfoService.Iface {
+
+ @Override
+ public List<Node> getRing() throws TException {
+ return ClusterMonitor.INSTANCE.getRing();
+ }
+
+ @Override
+ public List<DataPartitionEntry> getDataPartition(String path, long startTime, long endTime) {
+ MultiKeyMap<Long, PartitionGroup> partitions =
+ ClusterMonitor.INSTANCE.getDataPartition(path, startTime, endTime);
+ List<DataPartitionEntry> result = new ArrayList<>(partitions.size());
+ partitions.forEach(
+ (multikey, nodes) ->
+ result.add(new DataPartitionEntry(multikey.getKey(0), multikey.getKey(1), nodes)));
+ return result;
+ }
+
+ @Override
+ public List<Node> getMetaPartition(String path) throws TException {
+ return ClusterMonitor.INSTANCE.getMetaPartition(path);
+ }
+
+ @Override
+ public Map<Node, Boolean> getAllNodeStatus() throws TException {
+ return ClusterMonitor.INSTANCE.getAllNodeStatus();
+ }
+
+ @Override
+ public String getInstrumentingInfo() throws TException {
+ return ClusterMonitor.INSTANCE.getInstrumentingInfo();
+ }
+
+ public void handleClientExit() {
+ // do something when a client connection exits.
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceThriftHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceThriftHandler.java
new file mode 100644
index 0000000..8fc8783
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceThriftHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.clusterinfo;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class ClusterInfoServiceThriftHandler implements TServerEventHandler {
+ private ClusterInfoServiceImpl serviceImpl;
+
+ ClusterInfoServiceThriftHandler(ClusterInfoServiceImpl serviceImpl) {
+ this.serviceImpl = serviceImpl;
+ }
+
+ @Override
+ public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+ // nothing
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
+ // release query resources.
+ serviceImpl.handleClientExit();
+ }
+
+ @Override
+ public void preServe() {
+ // nothing
+ }
+
+ @Override
+ public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
+ // nothing
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index e72a33a..42d28a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -52,6 +52,8 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
String.format(
"%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, getID().getJmxName());
+ private ClusterMonitor() {}
+
@Override
public void start() throws StartupException {
try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
index 410f462..1ba18d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
@@ -39,7 +39,7 @@ public interface ClusterMonitorMBean {
* Get data partition information of input path and time range.
*
* @param path input path
- * @return data partition information
+ * @return data partition information: ((start time, end time), PartitionGroup)
*/
MultiKeyMap<Long, PartitionGroup> getDataPartition(String path, long startTime, long endTime);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
new file mode 100644
index 0000000..bdaf55c
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.server.clusterinfo;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClusterInfoServerTest {
+ ClusterInfoServiceImplTest test;
+ ClusterInfoServer service;
+
+ @Before
+ public void setUp() throws Exception {
+ test = new ClusterInfoServiceImplTest();
+ test.setUp();
+ service = new ClusterInfoServer();
+ service.start();
+ }
+
+ @After
+ public void tearDown() throws MetadataException {
+ test.tearDown();
+ service.stop();
+ }
+
+ @Test
+ public void testConnect() {
+ TTransport transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ ClusterDescriptor.getInstance().getConfig().getClusterInfoRpcPort()));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ Assert.fail(e.getMessage());
+ }
+ // connection success means OK.
+ ClusterInfoService.Client client =
+ new ClusterInfoService.Client(new TBinaryProtocol(transport));
+ Assert.assertNotNull(client);
+ // client's methods have been tested on ClusterInfoServiceImplTest
+ transport.close();
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
new file mode 100644
index 0000000..5dde20f
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.clusterinfo;
+
+import org.apache.iotdb.cluster.ClusterMain;
+import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ClusterInfoServiceImplTest {
+
+ ClusterInfoServiceImpl impl;
+
+ @Before
+ public void setUp() throws Exception {
+ MetaGroupMemberTest metaGroupMemberTest = new MetaGroupMemberTest();
+ // will create a cluster with 10 nodes, ip: 0,10,20,...100
+ metaGroupMemberTest.setUp();
+ MetaGroupMember metaGroupMember = metaGroupMemberTest.getTestMetaGroupMember();
+
+ MetaClusterServer metaClusterServer = new MetaClusterServer();
+ metaClusterServer.getMember().stop();
+ metaClusterServer.setMetaGroupMember(metaGroupMember);
+
+ ClusterMain.setMetaClusterServer(metaClusterServer);
+
+ metaClusterServer.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg"));
+ // metaClusterServer.getMember()
+ impl = new ClusterInfoServiceImpl();
+ }
+
+ @After
+ public void tearDown() throws MetadataException {
+ ClusterMain.getMetaServer()
+ .getIoTDB()
+ .metaManager
+ .deleteStorageGroups(Collections.singletonList(new PartialPath("root", "sg")));
+ ClusterMain.getMetaServer().stop();
+ }
+
+ @Test
+ public void getRing() throws TException {
+ List<Node> nodes = impl.getRing();
+ Assert.assertEquals(10, nodes.size());
+ }
+
+ @Test
+ public void getDataPartition() {
+ List<DataPartitionEntry> entries = impl.getDataPartition("root.sg", 0, 100);
+ Assert.assertEquals(1, entries.size());
+ List<Node> nodes = entries.get(0).getNodes();
+ Assert.assertEquals(50, nodes.get(0).getNodeIdentifier());
+ Assert.assertEquals(60, nodes.get(1).getNodeIdentifier());
+ }
+
+ @Test
+ public void getMetaPartition() throws TException {
+ List<Node> nodes = impl.getMetaPartition("root.sg");
+ Assert.assertEquals(50, nodes.get(0).getNodeIdentifier());
+ Assert.assertEquals(60, nodes.get(1).getNodeIdentifier());
+ }
+
+ @Test
+ public void getInstrumentingInfo() throws TException {
+ // hard to test the content of the instrumentInfo.
+ Assert.assertNotNull(impl.getInstrumentingInfo());
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 90cfeab..87cc093 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -187,7 +187,6 @@ public class MetaGroupMemberTest extends BaseMember {
mockDataClusterServer = false;
NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
exiledNode = null;
- System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get());
}
private DataGroupMember getDataGroupMember(PartitionGroup group, Node node) {
@@ -1370,4 +1369,8 @@ public class MetaGroupMemberTest extends BaseMember {
});
while (resultRef.get() == null) {}
}
+
+ public MetaGroupMember getTestMetaGroupMember() {
+ return testMetaMember;
+ }
}
diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md
index dcc6958..2e7b465 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -406,3 +406,101 @@ void createDeviceTemplate
```
+### Cluster information related APIs (only works in the cluster mode)
+
+Cluster information related APIs allow users get the cluster info like where a storage group will be
+partitioned to, the status of each node in the cluster.
+
+To use the APIs, add dependency in your pom file:
+
+```xml
+<dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift-cluster</artifactId>
+ <version>0.13.0-SNAPSHOT</version>
+ </dependency>
+</dependencies>
+```
+
+How to open a connection:
+
+```java
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+
+ public class CluserInfoClient {
+ TTransport transport;
+ ClusterInfoService.Client client;
+ public void connect() {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ // the RPC address
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ // the RPC port
+ ClusterDescriptor.getInstance().getConfig().getClusterRpcPort()));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ Assert.fail(e.getMessage());
+ }
+ //get the client
+ client = new ClusterInfoService.Client(new TBinaryProtocol(transport));
+ }
+ public void close() {
+ transport.close();
+ }
+ }
+```
+
+APIs in `ClusterInfoService.Client`:
+
+
+* Get the physical hash ring of the cluster:
+
+```java
+list<Node> getRing();
+```
+
+* Get data partition information of input path and time range:
+
+```java
+ /**
+ * @param path input path (should contains a Storage group name as its prefix)
+ * @return the data partition info. If the time range only covers one data partition, the the size
+ * of the list is one.
+ */
+ list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
+```
+
+* Get metadata partition information of input path:
+```java
+ /**
+ * @param path input path (should contains a Storage group name as its prefix)
+ * @return metadata partition information
+ */
+ list<Node> getMetaPartition(1:string path);
+```
+
+* Get the status (alive or not) of all nodes:
+```java
+ /**
+ * @return key: node, value: live or not
+ */
+ map<Node, bool> getAllNodeStatus();
+```
+
+* get the raft group info (voteFor, term, etc..) of the connected node
+ (Notice that this API is rarely used by users):
+```java
+ /**
+ * @return A multi-line string with each line representing the total time consumption, invocation
+ * number, and average time consumption.
+ */
+ string getInstrumentingInfo();
+```
+
diff --git a/docs/UserGuide/Cluster/Cluster-Setup.md b/docs/UserGuide/Cluster/Cluster-Setup.md
index a0751fc..85dfccc 100644
--- a/docs/UserGuide/Cluster/Cluster-Setup.md
+++ b/docs/UserGuide/Cluster/Cluster-Setup.md
@@ -100,6 +100,15 @@ The configuration items described below are in the `iotdb-cluster.properties` fi
|Default|40010|
|Effective| After restart system, shall NOT change after cluster is up|
+* cluster\_info\_public\_port
+
+|Name|cluster\_info\_public\_port|
+|:---:|:---|
+|Description|The port of RPC service that getting the cluster info (e.g., data partition)|
+|Type|Int32|
+|Default|6567|
+|Effective| After restart system|
+
* open\_server\_rpc\_port
|Name|open\_server\_rpc\_port|
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index fec6cfe..7a6471b 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -286,3 +286,98 @@ void testInsertTablet(Tablet tablet)
使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
+
+
+### 集群信息相关的接口 (仅在集群模式下可用)
+
+集群信息相关的接口允许用户获取如数据分区情况、节点是否当机等信息。
+要使用该API,需要增加依赖:
+
+```xml
+<dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift-cluster</artifactId>
+ <version>0.13.0-SNAPSHOT</version>
+ </dependency>
+</dependencies>
+```
+
+建立连接与关闭连接的示例:
+
+```java
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+
+ public class CluserInfoClient {
+ TTransport transport;
+ ClusterInfoService.Client client;
+ public void connect() {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ // the RPC address
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ // the RPC port
+ ClusterDescriptor.getInstance().getConfig().getClusterRpcPort()));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ Assert.fail(e.getMessage());
+ }
+ //get the client
+ client = new ClusterInfoService.Client(new TBinaryProtocol(transport));
+ }
+ public void close() {
+ transport.close();
+ }
+ }
+```
+
+API列表:
+
+* 获取集群中的各个节点的信息(构成哈希环)
+
+```java
+list<Node> getRing();
+```
+
+* 给定一个路径(应包括一个SG作为前缀)和起止时间,获取其覆盖的数据分区情况:
+
+```java
+ /**
+ * @param path input path (should contains a Storage group name as its prefix)
+ * @return the data partition info. If the time range only covers one data partition, the the size
+ * of the list is one.
+ */
+ list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
+```
+
+* 给定一个路径(应包括一个SG作为前缀),获取其被分到了哪个节点上:
+```java
+ /**
+ * @param path input path (should contains a Storage group name as its prefix)
+ * @return metadata partition information
+ */
+ list<Node> getMetaPartition(1:string path);
+```
+
+* 获取所有节点的死活状态:
+```java
+ /**
+ * @return key: node, value: live or not
+ */
+ map<Node, bool> getAllNodeStatus();
+```
+
+* 获取当前连接节点的Raft组信息(投票编号等)(一般用户无需使用该接口):
+```java
+ /**
+ * @return A multi-line string with each line representing the total time consumption, invocation
+ * number, and average time consumption.
+ */
+ string getInstrumentingInfo();
+```
diff --git a/docs/zh/UserGuide/Administration-Management/Administration.md b/docs/zh/UserGuide/Administration-Management/Administration.md
index 0c3f00c..fb0e6c4 100644
--- a/docs/zh/UserGuide/Administration-Management/Administration.md
+++ b/docs/zh/UserGuide/Administration-Management/Administration.md
@@ -33,7 +33,7 @@ IoTDB为用户提供了权限管理操作,从而为用户提供对于数据的
### 权限
-数据库提供多种操作,并不是所有的用户都能执行所有操作。如果一个用户可以执行某项操作,则称该用户有执行该操作的权限。权限可分为数据管理权限(如对数据进行增删改查)以及权限管理权限(用户、角色的创建与删除,权限的赋予与撤销等)。数据管理权限往往需要一个路径来限定其生效范围,它的生效范围是以该路径对应的节点为根的一颗子树(具体请参考IoTDB的数据组织)。
+数据库提供多种操作,并不是所有的用户都能执行所有操作。如果一个用户可以执行某项操作,则称该用户有执行该操作的权限。权限可分为数据管理权限(如对数据进行增删改查)以及权限管理权限(用户、角色的创建与删除,权限的赋予与撤销等)。数据管理权限往往需要一个路径来限定其生效范围,它的生效范围是以该路径对应的节点为根的一棵子树(具体请参考IoTDB的数据组织)。
### 角色
diff --git a/docs/zh/UserGuide/Cluster/Cluster-Setup.md b/docs/zh/UserGuide/Cluster/Cluster-Setup.md
index bcceb89..420ee89 100644
--- a/docs/zh/UserGuide/Cluster/Cluster-Setup.md
+++ b/docs/zh/UserGuide/Cluster/Cluster-Setup.md
@@ -95,6 +95,15 @@ iotdb-engines.properties配置文件中的部分内容会不再生效:
|默认值|40010|
|改后生效方式|重启服务生效,集群建立后不可再修改|
+* cluster\_info\_public\_port
+
+|名字|cluster\_info\_public\_port|
+|:---:|:---|
+|描述|用于查看集群信息(如数据分区)的RPC服务的接口|
+|类型|Int32|
+|默认值|6567|
+|改后生效方式| 重启服务生效|
+
* open\_server\_rpc\_port
|名字|open\_server\_rpc\_port|
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 5f5c093..87a21c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -44,7 +44,8 @@ public enum ThreadName {
LOAD_TSFILE("Load-TsFile"),
TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
QUERY_SERVICE("Query"),
- WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager");
+ WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
+ CLUSTER_INFO_SERVICE("ClusterInfoClient");
private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 48baf39..dc652cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -52,7 +52,10 @@ public enum ServiceType {
FLUSH_SERVICE(
"Flush ServerService", generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor"),
- SYSTEMINFO_SERVICE("MemTable Monitor Service", "MemTable, Monitor");
+ SYSTEMINFO_SERVICE("MemTable Monitor Service", "MemTable, Monitor"),
+
+ CLUSTER_INFO_SERVICE("Cluster Monitor Service (thrift-based)", "Cluster Monitor-Thrift"),
+ ;
private final String name;
private final String jmxName;
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
new file mode 100644
index 0000000..4131df3
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sql;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class Cases {
+
+ protected Statement writeStatement;
+ protected Connection writeConnection;
+ protected Statement[] readStatements;
+ protected Connection[] readConnections;
+
+ /** initialize the writeStatement,writeConnection, readStatements and the readConnections. */
+ public abstract void init() throws Exception;
+
+ public void clean() throws Exception {
+ writeStatement.close();
+ writeConnection.close();
+ for (Statement statement : readStatements) {
+ statement.close();
+ }
+ for (Connection connection : readConnections) {
+ connection.close();
+ }
+ }
+
+ // if we seperate the test into multiply test() methods, then the docker container have to be
+ // built
+ // several times. So, if the test cases are not conflict, we can put them into one method.
+ // but if you want to avoid other cases' impact, use a seperate test() method.
+ @Test
+ public void multiCasesTest() throws SQLException {
+
+ String[] timeSeriesArray = {"root.sg1.aa.bb", "root.sg1.aa.bb.cc", "root.sg1.aa"};
+
+ for (String timeSeries : timeSeriesArray) {
+ writeStatement.execute(
+ String.format(
+ "create timeseries %s with datatype=INT64, encoding=PLAIN, compression=SNAPPY",
+ timeSeries));
+ }
+ ResultSet resultSet = null;
+ // try to read data on each node.
+ for (Statement readStatement : readStatements) {
+ resultSet = readStatement.executeQuery("show timeseries");
+ Set<String> result = new HashSet<>();
+ while (resultSet.next()) {
+ result.add(resultSet.getString(1));
+ }
+ Assert.assertEquals(3, result.size());
+ for (String timeseries : timeSeriesArray) {
+ Assert.assertTrue(result.contains(timeseries));
+ }
+ resultSet.close();
+ }
+
+ // test https://issues.apache.org/jira/browse/IOTDB-1331
+ writeStatement.execute(
+ "create timeseries root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=RLE");
+ String[] initDataArray = {
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature) values(200,20.71)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature) values(220,50.71)"
+ };
+ for (String initData : initDataArray) {
+ writeStatement.execute(initData);
+ }
+ // try to read data on each node.
+ for (Statement readStatement : readStatements) {
+ resultSet = readStatement.executeQuery("select avg(temperature) from root.ln.wf01.wt01");
+ if (resultSet.next()) {
+ Assert.assertEquals(35.71, resultSet.getDouble(1), 0.01);
+ } else {
+ Assert.fail("expect 1 result, but get an empty resultSet.");
+ }
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+
+ // test https://issues.apache.org/jira/browse/IOTDB-1348
+ initDataArray =
+ new String[] {
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(250, 10.0)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(300, 20.0)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(350, 25.0)"
+ };
+
+ for (String initData : initDataArray) {
+ writeStatement.execute(initData);
+ }
+ // try to read data on each node.
+ for (Statement readStatement : readStatements) {
+ resultSet = readStatement.executeQuery("select last * from root.ln.wf01.wt01;");
+ Assert.assertTrue(resultSet.next());
+ double last = Double.parseDouble(resultSet.getString(3));
+ Assert.assertEquals(25.0, last, 0.1);
+ resultSet.close();
+ }
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
index 79b1292..b8b07b1 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
@@ -16,130 +16,78 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.sql;
import org.apache.iotdb.jdbc.Config;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.DockerComposeContainer;
-import org.testcontainers.containers.NoProjectNameDockerComposeContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import java.io.File;
-import java.sql.*;
-import java.util.HashSet;
-import java.util.Set;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
-public class ClusterIT {
- private static Logger logger = LoggerFactory.getLogger(ClusterIT.class);
- private static Logger node1Logger = LoggerFactory.getLogger("iotdb-server_1");
- private static Logger node2Logger = LoggerFactory.getLogger("iotdb-server_2");
- private static Logger node3Logger = LoggerFactory.getLogger("iotdb-server_3");
-
- private Statement statement;
- private Connection connection;
-
- // in TestContainer's document, it is @ClassRule, and the environment is `public static`
- // I am not sure the difference now.
- @Rule
- public DockerComposeContainer environment =
- new NoProjectNameDockerComposeContainer(
- "3nodes", new File("src/test/resources/3nodes/docker-compose.yaml"))
- .withExposedService("iotdb-server_1", 6667, Wait.forListeningPort())
- .withLogConsumer("iotdb-server_1", new Slf4jLogConsumer(node1Logger))
- .withExposedService("iotdb-server_2", 6667, Wait.forListeningPort())
- .withLogConsumer("iotdb-server_2", new Slf4jLogConsumer(node2Logger))
- .withExposedService("iotdb-server_3", 6667, Wait.forListeningPort())
- .withLogConsumer("iotdb-server_3", new Slf4jLogConsumer(node3Logger))
- .withLocalCompose(true);
-
- int rpcPort = 6667;
+// do not add tests here.
+// add tests into Cases.java instead.
+public abstract class ClusterIT extends Cases {
- @Before
- public void setUp() throws Exception {
+ private static Logger logger = LoggerFactory.getLogger(ClusterIT.class);
- String ip = environment.getServiceHost("iotdb-server_1", 6667);
- rpcPort = environment.getServicePort("iotdb-server_1", 6667);
+ // "root.sg1" is a special storage for testing whether the read and write operations can be run
+ // correctly if the data is not on the connected node.
+ public String defaultSG = "root.sg1";
- Class.forName(Config.JDBC_DRIVER_NAME);
- connection = DriverManager.getConnection("jdbc:iotdb://" + ip + ":" + rpcPort, "root", "root");
- statement = connection.createStatement();
+ protected int getWriteRpcPort() {
+ return getContainer().getServicePort("iotdb-server_1", 6667);
}
- @After
- public void tearDown() throws Exception {
- statement.close();
- connection.close();
+ protected String getWriteRpcIp() {
+ return getContainer().getServiceHost("iotdb-server_1", 6667);
}
- @Test
- public void testSimplePutAndGet() throws SQLException {
-
- String[] timeSeriesArray = {"root.sg1.aa.bb", "root.sg1.aa.bb.cc", "root.sg1.aa"};
+ protected int[] getReadRpcPorts() {
+ return new int[] {getContainer().getServicePort("iotdb-server_1", 6667)};
+ }
- for (String timeSeries : timeSeriesArray) {
- statement.execute(
- String.format(
- "create timeseries %s with datatype=INT64, encoding=PLAIN, compression=SNAPPY",
- timeSeries));
- }
- ResultSet resultSet = null;
- resultSet = statement.executeQuery("show timeseries");
- Set<String> result = new HashSet<>();
- while (resultSet.next()) {
- result.add(resultSet.getString(1));
- }
- Assert.assertEquals(3, result.size());
- for (String timeseries : timeSeriesArray) {
- Assert.assertTrue(result.contains(timeseries));
- }
+ protected String[] getReadRpcIps() {
+ return new String[] {getContainer().getServiceHost("iotdb-server_1", 6667)};
}
- @Test
- public void testAgg() throws SQLException {
+ protected void startCluster() {}
- String[] timeSeriesArray = {"root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=RLE"};
- String[] initDataArray = {
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature) values(200,20.71)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature) values(220,50.71)"
- };
+ protected abstract DockerComposeContainer getContainer();
- for (String timeSeries : timeSeriesArray) {
- statement.execute(String.format("create timeseries %s ", timeSeries));
- }
- for (String initData : initDataArray) {
- statement.execute(initData);
+ @Before
+ public void init() throws Exception {
+ startCluster();
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ writeConnection =
+ DriverManager.getConnection(
+ "jdbc:iotdb://" + getWriteRpcIp() + ":" + getWriteRpcPort(), "root", "root");
+ writeStatement = writeConnection.createStatement();
+
+ int[] readPorts = getReadRpcPorts();
+ String[] readIps = getReadRpcIps();
+ readConnections = new Connection[readPorts.length];
+ readStatements = new Statement[readPorts.length];
+ for (int i = 0; i < readPorts.length; i++) {
+ readConnections[i] =
+ DriverManager.getConnection(
+ "jdbc:iotdb://" + readIps[i] + ":" + readPorts[i], "root", "root");
+ readStatements[i] = readConnections[i].createStatement();
}
- ResultSet resultSet = statement.executeQuery("select avg(temperature) from root.ln.wf01.wt01;");
- Assert.assertTrue(resultSet.next());
- double avg = resultSet.getDouble(1);
- Assert.assertEquals(35.71, avg, 0.1);
- resultSet.close();
}
- @Test
- public void testLast() throws SQLException {
-
- String[] timeSeriesArray = {"root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=RLE"};
- String[] initDataArray = {
- "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(100, 10.0)",
- "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(200, 20.0)",
- "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(150, 15.0)"
- };
-
- for (String timeSeries : timeSeriesArray) {
- statement.execute(String.format("create timeseries %s ", timeSeries));
- }
- for (String initData : initDataArray) {
- statement.execute(initData);
- }
- ResultSet resultSet = statement.executeQuery("select last * from root.ln.wf01.wt01;");
- Assert.assertTrue(resultSet.next());
- double last = Double.parseDouble(resultSet.getString(3));
- Assert.assertEquals(20.0, last, 0.1);
- resultSet.close();
+ @After
+ public void clean() throws Exception {
+ super.clean();
}
+
+ // do not add tests here.
+ // add tests into Cases.java instead.
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
index 9a3ba1b..8e7bc4c 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/SingleNodeIT.java
@@ -32,13 +32,11 @@ import org.testcontainers.utility.DockerImageName;
import java.io.File;
import java.sql.*;
-import java.util.HashSet;
-import java.util.Set;
-public class SingleNodeIT {
+// do not add tests here.
+// add tests into Cases.java instead.
+public class SingleNodeIT extends Cases {
private static Logger logger = LoggerFactory.getLogger(SingleNodeIT.class);
- private Statement statement;
- private Connection connection;
@Rule
public GenericContainer dslContainer =
@@ -61,85 +59,23 @@ public class SingleNodeIT {
int syncPort = 5555;
@Before
- public void setUp() throws Exception {
+ public void init() throws Exception {
rpcPort = dslContainer.getMappedPort(6667);
-
syncPort = dslContainer.getMappedPort(5555);
Class.forName(Config.JDBC_DRIVER_NAME);
- connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:" + rpcPort, "root", "root");
- statement = connection.createStatement();
+ readConnections = new Connection[1];
+ readStatements = new Statement[1];
+ writeConnection =
+ readConnections[0] =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:" + rpcPort, "root", "root");
+ writeStatement = readStatements[0] = writeConnection.createStatement();
}
@After
- public void tearDown() throws Exception {
- statement.close();
- connection.close();
- }
-
- @Test
- public void testSimplePutAndGet() throws SQLException {
- String[] timeSeriesArray = {"root.sg1.aa.bb", "root.sg1.aa.bb.cc", "root.sg1.aa"};
-
- for (String timeSeries : timeSeriesArray) {
- statement.execute(
- String.format(
- "create timeseries %s with datatype=INT64, encoding=PLAIN, compression=SNAPPY",
- timeSeries));
- }
- ResultSet resultSet = null;
- resultSet = statement.executeQuery("show timeseries");
- Set<String> result = new HashSet<>();
- while (resultSet.next()) {
- result.add(resultSet.getString(1));
- }
- Assert.assertEquals(3, result.size());
- for (String timeseries : timeSeriesArray) {
- Assert.assertTrue(result.contains(timeseries));
- }
+ public void clean() throws Exception {
+ super.clean();
}
- @Test
- public void testAgg() throws SQLException {
-
- String[] timeSeriesArray = {"root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=RLE"};
- String[] initDataArray = {
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature) values(200,20.71)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature) values(220,50.71)"
- };
-
- for (String timeSeries : timeSeriesArray) {
- statement.execute(String.format("create timeseries %s ", timeSeries));
- }
- for (String initData : initDataArray) {
- statement.execute(initData);
- }
- ResultSet resultSet = statement.executeQuery("select avg(temperature) from root.ln.wf01.wt01;");
- Assert.assertTrue(resultSet.next());
- double avg = resultSet.getDouble(1);
- Assert.assertEquals(35.71, avg, 0.1);
- resultSet.close();
- }
-
- @Test
- public void testLast() throws SQLException {
-
- String[] timeSeriesArray = {"root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=RLE"};
- String[] initDataArray = {
- "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(100, 10.0)",
- "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(200, 20.0)",
- "INSERT INTO root.ln.wf01.wt01(timestamp, temperature) values(150, 15.0)"
- };
-
- for (String timeSeries : timeSeriesArray) {
- statement.execute(String.format("create timeseries %s ", timeSeries));
- }
- for (String initData : initDataArray) {
- statement.execute(initData);
- }
- ResultSet resultSet = statement.executeQuery("select last * from root.ln.wf01.wt01;");
- Assert.assertTrue(resultSet.next());
- double last = Double.parseDouble(resultSet.getString(3));
- Assert.assertEquals(20.0, last, 0.1);
- resultSet.close();
- }
+ // do not add tests here.
+ // add tests into Cases.java instead.
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/node1/OneNodeClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/node1/OneNodeClusterIT.java
new file mode 100644
index 0000000..daa7161
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/node1/OneNodeClusterIT.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.node1;
+
+import org.apache.iotdb.db.sql.ClusterIT;
+
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.NoProjectNameDockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+
+// a cluster with only one node.
+// in this case, the read and write connection must be on the same node,
+// the data is also
+public class OneNodeClusterIT extends ClusterIT {
+ private static Logger node1Logger = LoggerFactory.getLogger("iotdb-server_1");
+
+ // in TestContainer's document, it is @ClassRule, and the environment is `public static`
+ // I am not sure the difference now.
+ @Rule
+ public DockerComposeContainer environment =
+ new NoProjectNameDockerComposeContainer(
+ "1node", new File("src/test/resources/1node/docker-compose.yaml"))
+ .withExposedService("iotdb-server_1", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_1", new Slf4jLogConsumer(node1Logger))
+ .withLocalCompose(true);
+
+ @Override
+ protected DockerComposeContainer getContainer() {
+ return environment;
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/AbstractThreeNodeClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/AbstractThreeNodeClusterIT.java
new file mode 100644
index 0000000..2729581
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/AbstractThreeNodeClusterIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes3;
+
+import org.apache.iotdb.db.sql.ClusterIT;
+
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.NoProjectNameDockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+
+// just add real ITs into AbstractClusterIT.
+// in this case, the data must be on all nodes.
+// we just simulate write data on node A and read data on either node A or B.
+public abstract class AbstractThreeNodeClusterIT extends ClusterIT {
+
+ private static Logger node1Logger = LoggerFactory.getLogger("iotdb-server_1");
+ private static Logger node2Logger = LoggerFactory.getLogger("iotdb-server_2");
+ private static Logger node3Logger = LoggerFactory.getLogger("iotdb-server_3");
+
+ // in TestContainer's document, it is @ClassRule, and the environment is `public static`
+ // I am not sure the difference now.
+ @Rule
+ public DockerComposeContainer environment =
+ new NoProjectNameDockerComposeContainer(
+ "3nodes", new File("src/test/resources/3nodes/docker-compose.yaml"))
+ .withExposedService("iotdb-server_1", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_1", new Slf4jLogConsumer(node1Logger))
+ .withExposedService("iotdb-server_2", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_2", new Slf4jLogConsumer(node2Logger))
+ .withExposedService("iotdb-server_3", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_3", new Slf4jLogConsumer(node3Logger))
+ .withLocalCompose(true);
+
+ @Override
+ protected DockerComposeContainer getContainer() {
+ return environment;
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/ThreeNodeCluster1IT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/ThreeNodeCluster1IT.java
new file mode 100644
index 0000000..4802100
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/ThreeNodeCluster1IT.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes3;
+
+// read and the write statements are on the same node.
+public class ThreeNodeCluster1IT extends AbstractThreeNodeClusterIT {}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/ThreeNodeCluster2IT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/ThreeNodeCluster2IT.java
new file mode 100644
index 0000000..69ee3a5
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes3/ThreeNodeCluster2IT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes3;
+
+// read and the write statements are on the different nodes.
+public class ThreeNodeCluster2IT extends AbstractThreeNodeClusterIT {
+
+ protected String getWriteRpcIp() {
+ return getContainer().getServiceHost("iotdb-server_2", 6667);
+ }
+
+ protected int getWriteRpcPort() {
+ return getContainer().getServicePort("iotdb-server_2", 6667);
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/AbstractFiveNodeClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/AbstractFiveNodeClusterIT.java
new file mode 100644
index 0000000..4535fdb
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/AbstractFiveNodeClusterIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes5;
+
+import org.apache.iotdb.db.sql.ClusterIT;
+
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.NoProjectNameDockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+
+// just add real ITs into AbstractClusterIT.
+// in this case, the data may be not on the node that a read client connects.
+// So, we have: write data on node A and read data on either node A, B, and C.
+public abstract class AbstractFiveNodeClusterIT extends ClusterIT {
+
+ private static Logger node1Logger = LoggerFactory.getLogger("iotdb-server_1");
+ private static Logger node2Logger = LoggerFactory.getLogger("iotdb-server_2");
+ private static Logger node3Logger = LoggerFactory.getLogger("iotdb-server_3");
+ private static Logger node4Logger = LoggerFactory.getLogger("iotdb-server_4");
+ private static Logger node5Logger = LoggerFactory.getLogger("iotdb-server_5");
+
+ // in TestContainer's document, it is @ClassRule, and the environment is `public static`
+ // I am not sure the difference now.
+ @Rule
+ public DockerComposeContainer environment =
+ new NoProjectNameDockerComposeContainer(
+ "5nodes", new File("src/test/resources/5nodes/docker-compose.yaml"))
+ .withExposedService("iotdb-server_1", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_1", new Slf4jLogConsumer(node1Logger))
+ .withExposedService("iotdb-server_2", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_2", new Slf4jLogConsumer(node2Logger))
+ .withExposedService("iotdb-server_3", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_3", new Slf4jLogConsumer(node3Logger))
+ .withExposedService("iotdb-server_4", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_4", new Slf4jLogConsumer(node4Logger))
+ .withExposedService("iotdb-server_5", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-server_5", new Slf4jLogConsumer(node5Logger))
+ .withLocalCompose(true);
+
+ @Override
+ protected DockerComposeContainer getContainer() {
+ return environment;
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster1IT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster1IT.java
new file mode 100644
index 0000000..328de27
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster1IT.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes5;
+
+// read and the write statements are on the different nodes.
+public class FiveNodeCluster1IT extends AbstractFiveNodeClusterIT {}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster2IT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster2IT.java
new file mode 100644
index 0000000..6a455bc
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster2IT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes5;
+
+// read and the write statements are on the different nodes, and maybe in the same raft group.
+public class FiveNodeCluster2IT extends AbstractFiveNodeClusterIT {
+
+ protected String getWriteRpcIp() {
+ return getContainer().getServiceHost("iotdb-server_2", 6667);
+ }
+
+ protected int getWriteRpcPort() {
+ return getContainer().getServicePort("iotdb-server_2", 6667);
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster4IT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster4IT.java
new file mode 100644
index 0000000..2b8491f
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/nodes5/FiveNodeCluster4IT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql.nodes5;
+
+// read and the write statements are on the different nodes, and maybe in the different raft groups.
+public class FiveNodeCluster4IT extends AbstractFiveNodeClusterIT {
+
+ protected String getWriteRpcIp() {
+ return getContainer().getServiceHost("iotdb-server_4", 6667);
+ }
+
+ protected int getWriteRpcPort() {
+ return getContainer().getServicePort("iotdb-server_4", 6667);
+ }
+}
diff --git a/testcontainer/src/test/resources/1node/iotdb-cluster.properties b/testcontainer/src/test/resources/1node/iotdb-cluster.properties
index 2df52cc..e60395a 100644
--- a/testcontainer/src/test/resources/1node/iotdb-cluster.properties
+++ b/testcontainer/src/test/resources/1node/iotdb-cluster.properties
@@ -18,7 +18,7 @@
internal_meta_port=9003
internal_data_port=40010
-seed_nodes=3nodes_iotdb-server_1:9003
+seed_nodes=1node_iotdb-server_1:9003
default_replica_num=1
consistency_level=mid
connection_timeout_ms=20000
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index f23130e..2bcf9bf 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -368,7 +368,7 @@ service TSDataService extends RaftService {
* @return a map containing key-value,the serialized time-value pairs or an empty buffer if there
* are not more results.
**/
- map<string,binary> fetchMultSeries(1:Node header, 2:long readerId, list<string> paths)
+ map<string,binary> fetchMultSeries(1:Node header, 2:long readerId, 3:list<string> paths)
/**
* Query a time series and generate an IReaderByTimestamp.
@@ -508,5 +508,53 @@ service TSMetaService extends RaftService {
* cannot know when another node resumes, and handshakes are mainly used to update node status
* on coordinator side.
**/
- void handshake(Node sender);
+ void handshake(1:Node sender);
}
+
+
+struct DataPartitionEntry{
+ 1: required long startTime,
+ 2: required long endTime,
+ 3: required list<Node> nodes
+}
+
+/**
+* for cluster maintainer.
+* The interface will replace the JMX based NodeTool APIs.
+**/
+service ClusterInfoService {
+
+ /**
+ * Get physical hash ring
+ */
+ list<Node> getRing();
+
+ /**
+ * Get data partition information of input path and time range.
+ * @param path input path
+ * @return data partition information
+ */
+ list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
+
+ /**
+ * Get metadata partition information of input path
+ *
+ * @param path input path
+ * @return metadata partition information
+ */
+ list<Node> getMetaPartition(1:string path);
+
+ /**
+ * Get status of all nodes
+ *
+ * @return key: node, value: live or not
+ */
+ map<Node, bool> getAllNodeStatus();
+
+ /**
+ * @return A multi-line string with each line representing the total time consumption, invocation
+ * number, and average time consumption.
+ */
+ string getInstrumentingInfo();
+
+}
\ No newline at end of file
diff --git a/thrift-sync/pom.xml b/thrift-sync/pom.xml
index 5c1a696..66015e1 100644
--- a/thrift-sync/pom.xml
+++ b/thrift-sync/pom.xml
@@ -35,11 +35,6 @@
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-thrift</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
<plugins>