You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/13 15:49:53 UTC

[iotdb] branch master updated: [IOTDB-3114] NodeInfo snapshot interface (#5887)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2145522213 [IOTDB-3114] NodeInfo snapshot interface (#5887)
2145522213 is described below

commit 214552221364ff474bc2ac18e67fe95670cdc7d5
Author: lisijia <44...@users.noreply.github.com>
AuthorDate: Fri May 13 23:49:48 2022 +0800

    [IOTDB-3114] NodeInfo snapshot interface (#5887)
---
 .../iotdb/confignode/persistence/NodeInfo.java     | 137 +++++++++++++++++++--
 .../executor/ConfigRequestExecutor.java            |   1 +
 .../iotdb/confignode/persistence/NodeInfoTest.java | 102 +++++++++++++++
 3 files changed, 231 insertions(+), 9 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 308090848c..818cf7aded 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -31,22 +31,29 @@ import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
 import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -56,7 +63,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * The NodeInfo stores cluster node information. The cluster node information including: 1. DataNode
  * information 2. ConfigNode information
  */
-public class NodeInfo {
+public class NodeInfo implements SnapshotProcessor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
 
@@ -78,19 +85,18 @@ public class NodeInfo {
 
   private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
 
-  // TODO: serialize and deserialize
   private AtomicInteger nextDataNodeId = new AtomicInteger(0);
 
   // Online DataNodes
-  // TODO: serialize and deserialize
   private final ConcurrentNavigableMap<Integer, TDataNodeLocation> onlineDataNodes =
       new ConcurrentSkipListMap();
 
   // For remove or draining DataNode
   // TODO: implement
-  // TODO: serialize and deserialize
   private final Set<TDataNodeLocation> drainingDataNodes = new HashSet<>();
 
+  private final String snapshotFileName = "node_info.bin";
+
   private NodeInfo() {
     this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
     this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
@@ -272,15 +278,128 @@ public class NodeInfo {
     return nextDataNodeId.getAndIncrement();
   }
 
-  public void serialize(ByteBuffer buffer) {
-    // TODO: Serialize DataNodeInfo
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot file [{}] is already exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+
+    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
+    configNodeInfoReadWriteLock.readLock().lock();
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+        DataOutputStream dataOutputStream = new DataOutputStream(fileOutputStream);
+        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(dataOutputStream)) {
+
+      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
+      dataOutputStream.writeInt(nextDataNodeId.get());
+
+      serializeOnlineDataNode(dataOutputStream, protocol);
+
+      serializeDrainingDataNodes(dataOutputStream, protocol);
+
+      fileOutputStream.flush();
+    } finally {
+      configNodeInfoReadWriteLock.readLock().unlock();
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+
+    return tmpFile.renameTo(snapshotFile);
+  }
+
+  private void serializeOnlineDataNode(DataOutputStream outputStream, TProtocol protocol)
+      throws IOException, TException {
+    outputStream.writeInt(onlineDataNodes.size());
+    for (Entry<Integer, TDataNodeLocation> entry : onlineDataNodes.entrySet()) {
+      outputStream.writeInt(entry.getKey());
+      entry.getValue().write(protocol);
+    }
   }
 
-  public void deserialize(ByteBuffer buffer) {
-    // TODO: Deserialize DataNodeInfo
+  private void serializeDrainingDataNodes(DataOutputStream outputStream, TProtocol protocol)
+      throws IOException, TException {
+    outputStream.writeInt(drainingDataNodes.size());
+    for (TDataNodeLocation tDataNodeLocation : drainingDataNodes) {
+      tDataNodeLocation.write(protocol);
+    }
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
+
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+
+    configNodeInfoReadWriteLock.writeLock().lock();
+    dataNodeInfoReadWriteLock.writeLock().lock();
+
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
+        DataInputStream dataInputStream = new DataInputStream(fileInputStream);
+        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(dataInputStream)) {
+      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
+      clear();
+
+      nextDataNodeId.set(dataInputStream.readInt());
+
+      deserializeOnlineDataNode(dataInputStream, protocol);
+
+      deserializeDrainingDataNodes(dataInputStream, protocol);
+
+    } finally {
+      configNodeInfoReadWriteLock.writeLock().unlock();
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  private void deserializeOnlineDataNode(DataInputStream inputStream, TProtocol protocol)
+      throws IOException, TException {
+    int size = inputStream.readInt();
+    while (size > 0) {
+      int dataNodeId = inputStream.readInt();
+      TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
+      tDataNodeLocation.read(protocol);
+      onlineDataNodes.put(dataNodeId, tDataNodeLocation);
+      size--;
+    }
+  }
+
+  private void deserializeDrainingDataNodes(DataInputStream inputStream, TProtocol protocol)
+      throws IOException, TException {
+    int size = inputStream.readInt();
+    while (size > 0) {
+      TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
+      tDataNodeLocation.read(protocol);
+      drainingDataNodes.add(tDataNodeLocation);
+      size--;
+    }
+  }
+
+  // as drainingDataNodes is not currently implemented, manually set it to validate the test
+  @TestOnly
+  public void setDrainingDataNodes(Set<TDataNodeLocation> tDataNodeLocations) {
+    drainingDataNodes.addAll(tDataNodeLocations);
+  }
+
+  public int getNextDataNodeId() {
+    return nextDataNodeId.get();
   }
 
   @TestOnly
+  public Set<TDataNodeLocation> getDrainingDataNodes() {
+    return drainingDataNodes;
+  }
+
   public void clear() {
     nextDataNodeId = new AtomicInteger(0);
     onlineDataNodes.clear();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index f69efec570..fadcd07d65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -227,6 +227,7 @@ public class ConfigRequestExecutor {
     List<SnapshotProcessor> allAttributes = new ArrayList<>();
     allAttributes.add(clusterSchemaInfo);
     allAttributes.add(partitionInfo);
+    allAttributes.add(nodeInfo);
     return allAttributes;
   }
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
new file mode 100644
index 0000000000..2caf683a1b
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class NodeInfoTest {
+
+  private static NodeInfo nodeInfo;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+  @BeforeClass
+  public static void setup() {
+    nodeInfo = NodeInfo.getInstance();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    nodeInfo.clear();
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  @Test
+  public void testSnapshot() throws TException, IOException {
+
+    RegisterDataNodeReq registerDataNodeReq = new RegisterDataNodeReq(generateTDataNodeLocation(1));
+    nodeInfo.registerDataNode(registerDataNodeReq);
+
+    registerDataNodeReq = new RegisterDataNodeReq(generateTDataNodeLocation(2));
+    nodeInfo.registerDataNode(registerDataNodeReq);
+
+    Set<TDataNodeLocation> drainingDataNodes_before = new HashSet<>();
+    // parameter i is used to be flag in generateTDataNodeLocation
+    for (int i = 3; i < 8; i++) {
+      drainingDataNodes_before.add(generateTDataNodeLocation(i));
+    }
+    nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
+
+    int nextId = nodeInfo.getNextDataNodeId();
+    List<TDataNodeLocation> onlineDataNodes_before = nodeInfo.getOnlineDataNodes();
+
+    nodeInfo.processTakeSnapshot(snapshotDir);
+    nodeInfo.clear();
+    nodeInfo.processLoadSnapshot(snapshotDir);
+
+    Assert.assertEquals(nextId, nodeInfo.getNextDataNodeId());
+
+    Set<TDataNodeLocation> drainingDataNodes_after = nodeInfo.getDrainingDataNodes();
+    Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
+
+    List<TDataNodeLocation> onlineDataNodes_after = nodeInfo.getOnlineDataNodes();
+
+    Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
+  }
+
+  private TDataNodeLocation generateTDataNodeLocation(int flag) {
+    return new TDataNodeLocation(
+        10000 + flag,
+        new TEndPoint("127.0.0.1", 6600 + flag),
+        new TEndPoint("127.0.0.1", 7700 + flag),
+        new TEndPoint("127.0.0.1", 8800 + flag),
+        new TEndPoint("127.0.0.1", 9900 + flag));
+  }
+}