You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/13 15:49:53 UTC
[iotdb] branch master updated: [IOTDB-3114] NodeInfo snapshot interface (#5887)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2145522213 [IOTDB-3114] NodeInfo snapshot interface (#5887)
2145522213 is described below
commit 214552221364ff474bc2ac18e67fe95670cdc7d5
Author: lisijia <44...@users.noreply.github.com>
AuthorDate: Fri May 13 23:49:48 2022 +0800
[IOTDB-3114] NodeInfo snapshot interface (#5887)
---
.../iotdb/confignode/persistence/NodeInfo.java | 137 +++++++++++++++++++--
.../executor/ConfigRequestExecutor.java | 1 +
.../iotdb/confignode/persistence/NodeInfoTest.java | 102 +++++++++++++++
3 files changed, 231 insertions(+), 9 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 308090848c..818cf7aded 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -31,22 +31,29 @@ import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,7 +63,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* The NodeInfo stores cluster node information. The cluster node information including: 1. DataNode
* information 2. ConfigNode information
*/
-public class NodeInfo {
+public class NodeInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
@@ -78,19 +85,18 @@ public class NodeInfo {
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
- // TODO: serialize and deserialize
private AtomicInteger nextDataNodeId = new AtomicInteger(0);
// Online DataNodes
- // TODO: serialize and deserialize
private final ConcurrentNavigableMap<Integer, TDataNodeLocation> onlineDataNodes =
new ConcurrentSkipListMap();
// For remove or draining DataNode
// TODO: implement
- // TODO: serialize and deserialize
private final Set<TDataNodeLocation> drainingDataNodes = new HashSet<>();
+ private final String snapshotFileName = "node_info.bin";
+
private NodeInfo() {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
@@ -272,15 +278,128 @@ public class NodeInfo {
return nextDataNodeId.getAndIncrement();
}
- public void serialize(ByteBuffer buffer) {
- // TODO: Serialize DataNodeInfo
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
+ configNodeInfoReadWriteLock.readLock().lock();
+ dataNodeInfoReadWriteLock.readLock().lock();
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+ DataOutputStream dataOutputStream = new DataOutputStream(fileOutputStream);
+ TIOStreamTransport tioStreamTransport = new TIOStreamTransport(dataOutputStream)) {
+
+ TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
+ dataOutputStream.writeInt(nextDataNodeId.get());
+
+ serializeOnlineDataNode(dataOutputStream, protocol);
+
+ serializeDrainingDataNodes(dataOutputStream, protocol);
+
+ fileOutputStream.flush();
+ } finally {
+ configNodeInfoReadWriteLock.readLock().unlock();
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+
+ return tmpFile.renameTo(snapshotFile);
+ }
+
+ private void serializeOnlineDataNode(DataOutputStream outputStream, TProtocol protocol)
+ throws IOException, TException {
+ outputStream.writeInt(onlineDataNodes.size());
+ for (Entry<Integer, TDataNodeLocation> entry : onlineDataNodes.entrySet()) {
+ outputStream.writeInt(entry.getKey());
+ entry.getValue().write(protocol);
+ }
}
- public void deserialize(ByteBuffer buffer) {
- // TODO: Deserialize DataNodeInfo
+ private void serializeDrainingDataNodes(DataOutputStream outputStream, TProtocol protocol)
+ throws IOException, TException {
+ outputStream.writeInt(drainingDataNodes.size());
+ for (TDataNodeLocation tDataNodeLocation : drainingDataNodes) {
+ tDataNodeLocation.write(protocol);
+ }
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
+
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+
+ configNodeInfoReadWriteLock.writeLock().lock();
+ dataNodeInfoReadWriteLock.writeLock().lock();
+
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
+ DataInputStream dataInputStream = new DataInputStream(fileInputStream);
+ TIOStreamTransport tioStreamTransport = new TIOStreamTransport(dataInputStream)) {
+ TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
+ clear();
+
+ nextDataNodeId.set(dataInputStream.readInt());
+
+ deserializeOnlineDataNode(dataInputStream, protocol);
+
+ deserializeDrainingDataNodes(dataInputStream, protocol);
+
+ } finally {
+ configNodeInfoReadWriteLock.writeLock().unlock();
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void deserializeOnlineDataNode(DataInputStream inputStream, TProtocol protocol)
+ throws IOException, TException {
+ int size = inputStream.readInt();
+ while (size > 0) {
+ int dataNodeId = inputStream.readInt();
+ TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
+ tDataNodeLocation.read(protocol);
+ onlineDataNodes.put(dataNodeId, tDataNodeLocation);
+ size--;
+ }
+ }
+
+ private void deserializeDrainingDataNodes(DataInputStream inputStream, TProtocol protocol)
+ throws IOException, TException {
+ int size = inputStream.readInt();
+ while (size > 0) {
+ TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
+ tDataNodeLocation.read(protocol);
+ drainingDataNodes.add(tDataNodeLocation);
+ size--;
+ }
+ }
+
+ // as drainingDataNodes is not currently implemented, manually set it to validate the test
+ @TestOnly
+ public void setDrainingDataNodes(Set<TDataNodeLocation> tDataNodeLocations) {
+ drainingDataNodes.addAll(tDataNodeLocations);
+ }
+
+ public int getNextDataNodeId() {
+ return nextDataNodeId.get();
}
@TestOnly
+ public Set<TDataNodeLocation> getDrainingDataNodes() {
+ return drainingDataNodes;
+ }
+
public void clear() {
nextDataNodeId = new AtomicInteger(0);
onlineDataNodes.clear();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index f69efec570..fadcd07d65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -227,6 +227,7 @@ public class ConfigRequestExecutor {
List<SnapshotProcessor> allAttributes = new ArrayList<>();
allAttributes.add(clusterSchemaInfo);
allAttributes.add(partitionInfo);
+ allAttributes.add(nodeInfo);
return allAttributes;
}
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
new file mode 100644
index 0000000000..2caf683a1b
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class NodeInfoTest {
+
+ private static NodeInfo nodeInfo;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+ @BeforeClass
+ public static void setup() {
+ nodeInfo = NodeInfo.getInstance();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ nodeInfo.clear();
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testSnapshot() throws TException, IOException {
+
+ RegisterDataNodeReq registerDataNodeReq = new RegisterDataNodeReq(generateTDataNodeLocation(1));
+ nodeInfo.registerDataNode(registerDataNodeReq);
+
+ registerDataNodeReq = new RegisterDataNodeReq(generateTDataNodeLocation(2));
+ nodeInfo.registerDataNode(registerDataNodeReq);
+
+ Set<TDataNodeLocation> drainingDataNodes_before = new HashSet<>();
+ // parameter i is used to be flag in generateTDataNodeLocation
+ for (int i = 3; i < 8; i++) {
+ drainingDataNodes_before.add(generateTDataNodeLocation(i));
+ }
+ nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
+
+ int nextId = nodeInfo.getNextDataNodeId();
+ List<TDataNodeLocation> onlineDataNodes_before = nodeInfo.getOnlineDataNodes();
+
+ nodeInfo.processTakeSnapshot(snapshotDir);
+ nodeInfo.clear();
+ nodeInfo.processLoadSnapshot(snapshotDir);
+
+ Assert.assertEquals(nextId, nodeInfo.getNextDataNodeId());
+
+ Set<TDataNodeLocation> drainingDataNodes_after = nodeInfo.getDrainingDataNodes();
+ Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
+
+ List<TDataNodeLocation> onlineDataNodes_after = nodeInfo.getOnlineDataNodes();
+
+ Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
+ }
+
+ private TDataNodeLocation generateTDataNodeLocation(int flag) {
+ return new TDataNodeLocation(
+ 10000 + flag,
+ new TEndPoint("127.0.0.1", 6600 + flag),
+ new TEndPoint("127.0.0.1", 7700 + flag),
+ new TEndPoint("127.0.0.1", 8800 + flag),
+ new TEndPoint("127.0.0.1", 9900 + flag));
+ }
+}