You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/28 09:48:32 UTC

[iotdb] branch new_sync updated: [To new_sync][IOTDB-2770] Extract configurable parameters to iotdb-engine.properties (#5354)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new 7005997  [To new_sync][IOTDB-2770] Extract configurable parameters to iotdb-engine.properties (#5354)
7005997 is described below

commit 70059973ed395505006977fbf26c41af19773880
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Mar 28 17:47:47 2022 +0800

    [To new_sync][IOTDB-2770] Extract configurable parameters to iotdb-engine.properties (#5354)
---
 .../resources/conf/iotdb-engine.properties         |  15 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  16 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  25 +++-
 .../apache/iotdb/db/newsync/conf/SyncConstant.java |   1 +
 .../newsync/transport/client/TransportClient.java  |  41 +++----
 .../transport/conf/TransportSenderConfig.java      | 129 ---------------------
 .../transport/server/TransportServerManager.java   |  12 +-
 .../transport/server/TransportServiceImpl.java     |  49 +++++++-
 .../db/newsync/transport/TransportServiceTest.java |   5 +-
 9 files changed, 123 insertions(+), 170 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d76605e..f3b4e0d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -573,7 +573,20 @@ timestamp_precision=ms
 ####################
 # PIPE server port to listen
 # Datatype: int
-# local_pipe_server_port=5555
+# pipe_server_port=6670
+
+# White IP list of Sync client.
+# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
+# If there are more than one IP segment, please separate them by commas
+# The default is to allow all IP to sync
+# Datatype: String
+# ip_white_list=0.0.0.0/0
+
+####################
+### PIPE Sender Configuration
+####################
+# The maximum number of retry when syncing a file to receiver fails.
+# max_number_of_sync_file_retry=5
 
 ####################
 ### Sync Server Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b0307ac..27a4bbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -474,15 +474,17 @@ public class IoTDBConfig {
   private int syncServerPort = 5555;
 
   /** If this IoTDB instance is a receiver of sync, set the server port. */
-  private int pipeServerPort = 5555;
+  private int pipeServerPort = 6670;
+
+  private String ipWhiteList = "0.0.0.0/0";
+
+  private int maxNumberOfSyncFileRetry = 5;
 
   /**
    * Set the language version when loading file including error information, default value is "EN"
    */
   private String languageVersion = "EN";
 
-  private String ipWhiteList = "0.0.0.0/0";
-
   /** Examining period of cache file reader : 100 seconds. Unit: millisecond */
   private long cacheFileReaderClearPeriod = 100000;
 
@@ -1307,6 +1309,14 @@ public class IoTDBConfig {
     this.pipeServerPort = pipeServerPort;
   }
 
+  public int getMaxNumberOfSyncFileRetry() {
+    return maxNumberOfSyncFileRetry;
+  }
+
+  public void setMaxNumberOfSyncFileRetry(int maxNumberOfSyncFileRetry) {
+    this.maxNumberOfSyncFileRetry = maxNumberOfSyncFileRetry;
+  }
+
   String getLanguageVersion() {
     return languageVersion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ae739c6..8279e75 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -381,10 +381,17 @@ public class IoTDBDescriptor {
               properties
                   .getProperty("sync_server_port", Integer.toString(conf.getSyncServerPort()))
                   .trim()));
-      conf.setSyncServerPort(
+      conf.setPipeServerPort(
           Integer.parseInt(
               properties
-                  .getProperty("local_pipe_server_port", Integer.toString(conf.getSyncServerPort()))
+                  .getProperty("pipe_server_port", Integer.toString(conf.getPipeServerPort()))
+                  .trim()));
+      conf.setMaxNumberOfSyncFileRetry(
+          Integer.parseInt(
+              properties
+                  .getProperty(
+                      "max_number_of_sync_file_retry",
+                      Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
                   .trim()));
 
       conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
@@ -1230,6 +1237,20 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "select_into_insert_tablet_plan_row_limit",
                   String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+
+      // update sync config
+      conf.setPipeServerPort(
+          Integer.parseInt(
+              properties.getProperty(
+                  "pipe_server_port", String.valueOf(conf.getPipeServerPort()))));
+      conf.setMaxNumberOfSyncFileRetry(
+          Integer.parseInt(
+              properties
+                  .getProperty(
+                      "max_number_of_sync_file_retry",
+                      Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
+                  .trim()));
+      conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
index 5dd39f5..5fdd4ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
@@ -61,4 +61,5 @@ public class SyncConstant {
   public static final String RECEIVER_LOG_NAME = "receiverService.log";
   public static final String RECEIVER_MSG_LOG_NAME = "receiverMessage.log";
   public static final String FILE_DATA_DIR_NAME = "file-data";
+  public static final String IP_SEPARATOR = "\\.";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
index 0c99bec..a4698ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
 import org.apache.iotdb.db.newsync.sender.service.SenderService;
 import org.apache.iotdb.db.newsync.transport.conf.TransportConstant;
-import org.apache.iotdb.db.newsync.transport.conf.TransportSenderConfig;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.service.transport.thrift.IdentityInfo;
@@ -72,10 +71,7 @@ public class TransportClient implements ITransportClient {
 
   private static final Logger logger = LoggerFactory.getLogger(TransportClient.class);
 
-  // TODO: Need to change to transport config
-  private static TransportSenderConfig config = new TransportSenderConfig();
-
-  private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private static final int TIMEOUT_MS = 2000_000;
 
@@ -110,7 +106,7 @@ public class TransportClient implements ITransportClient {
   }
 
   public TransportClient(Pipe pipe, String ipAddress, int port) throws IOException {
-    RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
+    RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
 
     this.pipe = pipe;
     this.ipAddress = ipAddress;
@@ -122,16 +118,17 @@ public class TransportClient implements ITransportClient {
     try {
       while (!handshakeWithVersion()) {
         handshakeCounter++;
-        if (handshakeCounter > config.getMaxNumOfSyncFileRetry()) {
+        if (handshakeCounter > config.getMaxNumberOfSyncFileRetry()) {
           logger.error(
               String.format(
-                  "Handshake failed %s times! Check network.", config.getMaxNumOfSyncFileRetry()));
+                  "Handshake failed %s times! Check network.",
+                  config.getMaxNumberOfSyncFileRetry()));
           return false;
         }
         logger.info(
             String.format(
                 "Handshake error, retry %d/%d.",
-                handshakeCounter, config.getMaxNumOfSyncFileRetry()));
+                handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
       }
     } catch (SyncConnectionException e) {
       logger.error(String.format("Handshake failed and can not retry, because %s.", e));
@@ -148,7 +145,7 @@ public class TransportClient implements ITransportClient {
     try {
       transport = RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS);
       TProtocol protocol;
-      if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+      if (config.isRpcThriftCompressionEnable()) {
         protocol = new TCompactProtocol(transport);
       } else {
         protocol = new TBinaryProtocol(transport);
@@ -165,7 +162,7 @@ public class TransportClient implements ITransportClient {
               InetAddress.getLocalHost().getHostAddress(),
               pipe.getName(),
               pipe.getCreateTime(),
-              ioTDBConfig.getIoTDBMajorVersion());
+              config.getIoTDBMajorVersion());
       TransportStatus status = serviceClient.handshake(identityInfo);
       if (status.code != SUCCESS_CODE) {
         throw new SyncConnectionException(
@@ -200,7 +197,7 @@ public class TransportClient implements ITransportClient {
 
     while (true) {
       retryCount++;
-      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
         logger.error(
             String.format("After %s tries, stop the transport of current pipeData!", retryCount));
         throw new SyncConnectionException(
@@ -240,7 +237,7 @@ public class TransportClient implements ITransportClient {
     int retryCount = 0;
     while (true) {
       retryCount++;
-      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
         throw new SyncConnectionException(
             String.format("Connect to receiver error when transferring file %s.", file.getName()));
       }
@@ -311,11 +308,11 @@ public class TransportClient implements ITransportClient {
           int retryCount = 0;
           while (true) {
             retryCount++;
-            if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+            if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
               throw new SyncConnectionException(
                   String.format(
                       "Can not sync file %s after %s tries.",
-                      file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+                      file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
             }
             try {
               status =
@@ -391,11 +388,11 @@ public class TransportClient implements ITransportClient {
 
     while (true) {
       retryCount++;
-      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
         throw new SyncConnectionException(
             String.format(
                 "Can not sync file %s after %s tries.",
-                file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+                file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
       }
       try {
         status =
@@ -425,10 +422,10 @@ public class TransportClient implements ITransportClient {
     while (true) {
 
       retryCount++;
-      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
         throw new SyncConnectionException(
             String.format(
-                "Can not sync pipe data after %s tries.", config.getMaxNumOfSyncFileRetry()));
+                "Can not sync pipe data after %s tries.", config.getMaxNumberOfSyncFileRetry()));
       }
 
       try {
@@ -510,7 +507,7 @@ public class TransportClient implements ITransportClient {
     int retryCount = 0;
     while (true) {
       retryCount++;
-      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
         throw new SyncConnectionException(
             String.format(
                 "%s request connects to receiver %s:%d error.",
@@ -520,7 +517,7 @@ public class TransportClient implements ITransportClient {
       try (TTransport heartbeatTransport =
           RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS)) {
         TProtocol protocol;
-        if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+        if (config.isRpcThriftCompressionEnable()) {
           protocol = new TCompactProtocol(heartbeatTransport);
         } else {
           protocol = new TBinaryProtocol(heartbeatTransport);
@@ -536,7 +533,7 @@ public class TransportClient implements ITransportClient {
         logger.info(
             String.format(
                 "Heartbeat connect to receiver %s:%d error, retry %d/%d.",
-                ipAddress, port, retryCount, config.getMaxNumOfSyncFileRetry()));
+                ipAddress, port, retryCount, config.getMaxNumberOfSyncFileRetry()));
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java
deleted file mode 100644
index 9d5b57e..0000000
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.newsync.transport.conf;
-
-import org.apache.iotdb.db.sync.conf.SyncConstant;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TransportSenderConfig {
-
-  private String serverIp = "127.0.0.1";
-
-  private int serverPort = 5555;
-
-  private int syncPeriodInSecond = 600;
-
-  private String senderFolderPath;
-
-  private String lastFileInfoPath;
-
-  private String snapshotPath;
-
-  /** The maximum number of retry when syncing a file to receiver fails. */
-  private int maxNumOfSyncFileRetry = 5;
-
-  /** Storage groups which participate in sync process */
-  private List<String> storageGroupList = new ArrayList<>();
-
-  /** Update paths based on data directory */
-  public void update(String dataDirectory) {
-    senderFolderPath =
-        dataDirectory
-            + File.separatorChar
-            + SyncConstant.SYNC_SENDER
-            + File.separatorChar
-            + getSyncReceiverName();
-    lastFileInfoPath = senderFolderPath + File.separatorChar + SyncConstant.LAST_LOCAL_FILE_NAME;
-    snapshotPath = senderFolderPath + File.separatorChar + SyncConstant.DATA_SNAPSHOT_NAME;
-    if (!new File(snapshotPath).exists()) {
-      new File(snapshotPath).mkdirs();
-    }
-  }
-
-  public String getServerIp() {
-    return serverIp;
-  }
-
-  public void setServerIp(String serverIp) {
-    this.serverIp = serverIp;
-  }
-
-  public int getServerPort() {
-    return serverPort;
-  }
-
-  public void setServerPort(int serverPort) {
-    this.serverPort = serverPort;
-  }
-
-  public int getSyncPeriodInSecond() {
-    return syncPeriodInSecond;
-  }
-
-  public void setSyncPeriodInSecond(int syncPeriodInSecond) {
-    this.syncPeriodInSecond = syncPeriodInSecond;
-  }
-
-  public String getSenderFolderPath() {
-    return senderFolderPath;
-  }
-
-  public void setSenderFolderPath(String senderFolderPath) {
-    this.senderFolderPath = senderFolderPath;
-  }
-
-  public String getLastFileInfoPath() {
-    return lastFileInfoPath;
-  }
-
-  public void setLastFileInfoPath(String lastFileInfoPath) {
-    this.lastFileInfoPath = lastFileInfoPath;
-  }
-
-  public String getSnapshotPath() {
-    return snapshotPath;
-  }
-
-  public void setSnapshotPath(String snapshotPath) {
-    this.snapshotPath = snapshotPath;
-  }
-
-  public String getSyncReceiverName() {
-    return serverIp + SyncConstant.SYNC_DIR_NAME_SEPARATOR + serverPort;
-  }
-
-  public List<String> getStorageGroupList() {
-    return new ArrayList<>(storageGroupList);
-  }
-
-  public void setStorageGroupList(List<String> storageGroupList) {
-    this.storageGroupList = storageGroupList;
-  }
-
-  public int getMaxNumOfSyncFileRetry() {
-    return maxNumOfSyncFileRetry;
-  }
-
-  public void setMaxNumOfSyncFileRetry(int maxNumOfSyncFileRetry) {
-    this.maxNumOfSyncFileRetry = maxNumOfSyncFileRetry;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
index 6d00a82..8db7c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.service.thrift.ThriftService;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
@@ -84,7 +83,7 @@ public class TransportServerManager extends ThriftService
             getID().getName(),
             ThreadName.SYNC_CLIENT.getName(),
             config.getRpcAddress(),
-            SyncConstant.DEFAULT_PIPE_SINK_PORT,
+            config.getPipeServerPort(),
             Integer.MAX_VALUE,
             config.getThriftServerAwaitTimeForStopService(),
             new TransportServerThriftHandler(serviceImpl),
@@ -101,7 +100,7 @@ public class TransportServerManager extends ThriftService
   @Override
   public int getBindPort() {
     // TODO: Whether to change this config here
-    return SyncConstant.DEFAULT_PIPE_SINK_PORT;
+    return IoTDBDescriptor.getInstance().getConfig().getPipeServerPort();
   }
 
   //  @Override
@@ -111,19 +110,12 @@ public class TransportServerManager extends ThriftService
 
   @Override
   public void startService() throws StartupException {
-    // TODO: Whether to change this config here
-    //    if (!IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
-    //      return;
-    //    }
     super.startService();
   }
 
   @Override
   public void stopService() {
-    // TODO: Whether to change this config here
-    //    if (IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
     super.stopService();
-    //    }
   }
 
   @TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
index e4998a0..6230eb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.newsync.transport.server;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
 import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
@@ -46,6 +47,7 @@ import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.text.DecimalFormat;
 import java.util.Arrays;
 
 import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.CONFLICT_CODE;
@@ -126,7 +128,12 @@ public class TransportServiceImpl implements TransportService.Iface {
   @Override
   public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
     logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
-
+    // check ip address
+    if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
+      return new TransportStatus(
+          ERROR_CODE,
+          "Sender IP is not in the white list of receiver IP and synchronization tasks are not allowed.");
+    }
     // Version check
     if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
       return new TransportStatus(
@@ -142,6 +149,46 @@ public class TransportServiceImpl implements TransportService.Iface {
     return new TransportStatus(SUCCESS_CODE, "");
   }
 
+  /**
+   * Verify IP address with IP white list which contains more than one IP segment. It's used by sync
+   * sender.
+   */
+  private boolean verifyIPSegment(String ipWhiteList, String ipAddress) {
+    String[] ipSegments = ipWhiteList.split(",");
+    for (String IPsegment : ipSegments) {
+      int subnetMask = Integer.parseInt(IPsegment.substring(IPsegment.indexOf('/') + 1));
+      IPsegment = IPsegment.substring(0, IPsegment.indexOf('/'));
+      if (verifyIP(IPsegment, ipAddress, subnetMask)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** Verify IP address with IP segment. */
+  private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) {
+    String ipSegmentBinary;
+    String ipAddressBinary;
+    String[] ipSplits = ipSegment.split(SyncConstant.IP_SEPARATOR);
+    DecimalFormat df = new DecimalFormat("00000000");
+    StringBuilder ipSegmentBuilder = new StringBuilder();
+    for (String IPsplit : ipSplits) {
+      ipSegmentBuilder.append(
+          df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
+    }
+    ipSegmentBinary = ipSegmentBuilder.toString();
+    ipSegmentBinary = ipSegmentBinary.substring(0, subnetMark);
+    ipSplits = ipAddress.split(SyncConstant.IP_SEPARATOR);
+    StringBuilder ipAddressBuilder = new StringBuilder();
+    for (String IPsplit : ipSplits) {
+      ipAddressBuilder.append(
+          df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
+    }
+    ipAddressBinary = ipAddressBuilder.toString();
+    ipAddressBinary = ipAddressBinary.substring(0, subnetMark);
+    return ipAddressBinary.equals(ipSegmentBinary);
+  }
+
   @Override
   public TransportStatus transportData(
       IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java
index 298316e..b4ef7d1 100644
--- a/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java
@@ -19,11 +19,11 @@
  */
 package org.apache.iotdb.db.newsync.transport;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
@@ -127,7 +127,8 @@ public class TransportServiceTest {
     // 4. start client
     Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
     TransportClient client =
-        new TransportClient(pipe, "127.0.0.1", SyncConstant.DEFAULT_PIPE_SINK_PORT);
+        new TransportClient(
+            pipe, "127.0.0.1", IoTDBDescriptor.getInstance().getConfig().getPipeServerPort());
     client.handshake();
     for (PipeData pipeData : pipeDataList) {
       client.senderTransport(pipeData);