You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/05/13 02:01:18 UTC

[GitHub] [iotdb] cigarl commented on a diff in pull request #5887: [IOTDB-3114] NodeInfo snapshot interface

cigarl commented on code in PR #5887:
URL: https://github.com/apache/iotdb/pull/5887#discussion_r871940482


##########
confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java:
##########
@@ -272,15 +278,129 @@ public int generateNextDataNodeId() {
     return nextDataNodeId.getAndIncrement();
   }
 
-  public void serialize(ByteBuffer buffer) {
-    // TODO: Serialize DataNodeInfo
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot file [{}] is already exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+
+    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
+    configNodeInfoReadWriteLock.readLock().lock();
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+        DataOutputStream dataOutputStream = new DataOutputStream(fileOutputStream);
+        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(dataOutputStream)) {
+
+      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
+      dataOutputStream.writeInt(nextDataNodeId.get());
+
+      serializeOnlineDataNode(dataOutputStream, protocol);
+
+      serializeDrainingDataNodes(dataOutputStream, protocol);
+
+      fileOutputStream.flush();
+    } finally {
+      configNodeInfoReadWriteLock.readLock().unlock();
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+
+    return tmpFile.renameTo(snapshotFile);
+  }
+
+  private void serializeOnlineDataNode(DataOutputStream outputStream, TProtocol protocol)
+      throws IOException, TException {
+    outputStream.writeInt(onlineDataNodes.size());
+    for (Entry<Integer, TDataNodeLocation> entry : onlineDataNodes.entrySet()) {
+      outputStream.writeInt(entry.getKey());
+      entry.getValue().write(protocol);
+    }
+  }
+
+  private void serializeDrainingDataNodes(DataOutputStream outputStream, TProtocol protocol)
+      throws IOException, TException {
+    outputStream.writeInt(drainingDataNodes.size());
+    for (TDataNodeLocation tDataNodeLocation : drainingDataNodes) {
+      tDataNodeLocation.write(protocol);
+    }
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
+
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+
+    configNodeInfoReadWriteLock.writeLock().lock();
+    dataNodeInfoReadWriteLock.writeLock().lock();
+
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
+        DataInputStream dataInputStream = new DataInputStream(fileInputStream);
+        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(dataInputStream)) {
+      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
+      clear();
+
+      nextDataNodeId.set(dataInputStream.readInt());
+
+      deserializeOnlineDataNode(dataInputStream, protocol);
+
+      deserializeDrainingDataNodes(dataInputStream, protocol);
+
+    } finally {
+      configNodeInfoReadWriteLock.writeLock().unlock();
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  private void deserializeOnlineDataNode(DataInputStream inputStream, TProtocol protocol)
+      throws IOException, TException {
+    int size = inputStream.readInt();
+    while (size > 0) {
+      int dataNodeId = inputStream.readInt();
+      TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
+      tDataNodeLocation.read(protocol);
+      onlineDataNodes.put(dataNodeId, tDataNodeLocation);
+      size--;
+    }
+  }
+
+  private void deserializeDrainingDataNodes(DataInputStream inputStream, TProtocol protocol)
+      throws IOException, TException {
+    int size = inputStream.readInt();
+    while (size > 0) {
+      TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
+      tDataNodeLocation.read(protocol);
+      drainingDataNodes.add(tDataNodeLocation);
+      size--;
+    }
+  }
+
+  // as drainingDataNodes is not currently implemented, manually set it to validate the test
+  @TestOnly
+  public void setDrainingDataNodes(Set<TDataNodeLocation> tDataNodeLocations) {
+    drainingDataNodes.addAll(tDataNodeLocations);
   }
 
-  public void deserialize(ByteBuffer buffer) {
-    // TODO: Deserialize DataNodeInfo
+  @TestOnly
+  public int getNextDataNodeId() {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org