You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/28 09:48:32 UTC
[iotdb] branch new_sync updated: [To new_sync][IOTDB-2770] Extract configurable parameters to iotdb-engine.properties (#5354)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new 7005997 [To new_sync][IOTDB-2770] Extract configurable parameters to iotdb-engine.properties (#5354)
7005997 is described below
commit 70059973ed395505006977fbf26c41af19773880
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Mar 28 17:47:47 2022 +0800
[To new_sync][IOTDB-2770] Extract configurable parameters to iotdb-engine.properties (#5354)
---
.../resources/conf/iotdb-engine.properties | 15 ++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 25 +++-
.../apache/iotdb/db/newsync/conf/SyncConstant.java | 1 +
.../newsync/transport/client/TransportClient.java | 41 +++----
.../transport/conf/TransportSenderConfig.java | 129 ---------------------
.../transport/server/TransportServerManager.java | 12 +-
.../transport/server/TransportServiceImpl.java | 49 +++++++-
.../db/newsync/transport/TransportServiceTest.java | 5 +-
9 files changed, 123 insertions(+), 170 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d76605e..f3b4e0d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -573,7 +573,20 @@ timestamp_precision=ms
####################
# PIPE server port to listen
# Datatype: int
-# local_pipe_server_port=5555
+# pipe_server_port=6670
+
+# White IP list of Sync client.
+# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
+# If there are more than one IP segment, please separate them by commas
+# The default is to allow all IP to sync
+# Datatype: String
+# ip_white_list=0.0.0.0/0
+
+####################
+### PIPE Sender Configuration
+####################
+# The maximum number of retry when syncing a file to receiver fails.
+# max_number_of_sync_file_retry=5
####################
### Sync Server Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b0307ac..27a4bbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -474,15 +474,17 @@ public class IoTDBConfig {
private int syncServerPort = 5555;
/** If this IoTDB instance is a receiver of sync, set the server port. */
- private int pipeServerPort = 5555;
+ private int pipeServerPort = 6670;
+
+ private String ipWhiteList = "0.0.0.0/0";
+
+ private int maxNumberOfSyncFileRetry = 5;
/**
* Set the language version when loading file including error information, default value is "EN"
*/
private String languageVersion = "EN";
- private String ipWhiteList = "0.0.0.0/0";
-
/** Examining period of cache file reader : 100 seconds. Unit: millisecond */
private long cacheFileReaderClearPeriod = 100000;
@@ -1307,6 +1309,14 @@ public class IoTDBConfig {
this.pipeServerPort = pipeServerPort;
}
+ public int getMaxNumberOfSyncFileRetry() {
+ return maxNumberOfSyncFileRetry;
+ }
+
+ public void setMaxNumberOfSyncFileRetry(int maxNumberOfSyncFileRetry) {
+ this.maxNumberOfSyncFileRetry = maxNumberOfSyncFileRetry;
+ }
+
String getLanguageVersion() {
return languageVersion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ae739c6..8279e75 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -381,10 +381,17 @@ public class IoTDBDescriptor {
properties
.getProperty("sync_server_port", Integer.toString(conf.getSyncServerPort()))
.trim()));
- conf.setSyncServerPort(
+ conf.setPipeServerPort(
Integer.parseInt(
properties
- .getProperty("local_pipe_server_port", Integer.toString(conf.getSyncServerPort()))
+ .getProperty("pipe_server_port", Integer.toString(conf.getPipeServerPort()))
+ .trim()));
+ conf.setMaxNumberOfSyncFileRetry(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "max_number_of_sync_file_retry",
+ Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
.trim()));
conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
@@ -1230,6 +1237,20 @@ public class IoTDBDescriptor {
properties.getProperty(
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+
+ // update sync config
+ conf.setPipeServerPort(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_server_port", String.valueOf(conf.getPipeServerPort()))));
+ conf.setMaxNumberOfSyncFileRetry(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "max_number_of_sync_file_retry",
+ Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
+ .trim()));
+ conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
index 5dd39f5..5fdd4ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
@@ -61,4 +61,5 @@ public class SyncConstant {
public static final String RECEIVER_LOG_NAME = "receiverService.log";
public static final String RECEIVER_MSG_LOG_NAME = "receiverMessage.log";
public static final String FILE_DATA_DIR_NAME = "file-data";
+ public static final String IP_SEPARATOR = "\\.";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
index 0c99bec..a4698ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
import org.apache.iotdb.db.newsync.sender.service.SenderService;
import org.apache.iotdb.db.newsync.transport.conf.TransportConstant;
-import org.apache.iotdb.db.newsync.transport.conf.TransportSenderConfig;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.service.transport.thrift.IdentityInfo;
@@ -72,10 +71,7 @@ public class TransportClient implements ITransportClient {
private static final Logger logger = LoggerFactory.getLogger(TransportClient.class);
- // TODO: Need to change to transport config
- private static TransportSenderConfig config = new TransportSenderConfig();
-
- private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final int TIMEOUT_MS = 2000_000;
@@ -110,7 +106,7 @@ public class TransportClient implements ITransportClient {
}
public TransportClient(Pipe pipe, String ipAddress, int port) throws IOException {
- RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
+ RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
this.pipe = pipe;
this.ipAddress = ipAddress;
@@ -122,16 +118,17 @@ public class TransportClient implements ITransportClient {
try {
while (!handshakeWithVersion()) {
handshakeCounter++;
- if (handshakeCounter > config.getMaxNumOfSyncFileRetry()) {
+ if (handshakeCounter > config.getMaxNumberOfSyncFileRetry()) {
logger.error(
String.format(
- "Handshake failed %s times! Check network.", config.getMaxNumOfSyncFileRetry()));
+ "Handshake failed %s times! Check network.",
+ config.getMaxNumberOfSyncFileRetry()));
return false;
}
logger.info(
String.format(
"Handshake error, retry %d/%d.",
- handshakeCounter, config.getMaxNumOfSyncFileRetry()));
+ handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
}
} catch (SyncConnectionException e) {
logger.error(String.format("Handshake failed and can not retry, because %s.", e));
@@ -148,7 +145,7 @@ public class TransportClient implements ITransportClient {
try {
transport = RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS);
TProtocol protocol;
- if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+ if (config.isRpcThriftCompressionEnable()) {
protocol = new TCompactProtocol(transport);
} else {
protocol = new TBinaryProtocol(transport);
@@ -165,7 +162,7 @@ public class TransportClient implements ITransportClient {
InetAddress.getLocalHost().getHostAddress(),
pipe.getName(),
pipe.getCreateTime(),
- ioTDBConfig.getIoTDBMajorVersion());
+ config.getIoTDBMajorVersion());
TransportStatus status = serviceClient.handshake(identityInfo);
if (status.code != SUCCESS_CODE) {
throw new SyncConnectionException(
@@ -200,7 +197,7 @@ public class TransportClient implements ITransportClient {
while (true) {
retryCount++;
- if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
logger.error(
String.format("After %s tries, stop the transport of current pipeData!", retryCount));
throw new SyncConnectionException(
@@ -240,7 +237,7 @@ public class TransportClient implements ITransportClient {
int retryCount = 0;
while (true) {
retryCount++;
- if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
throw new SyncConnectionException(
String.format("Connect to receiver error when transferring file %s.", file.getName()));
}
@@ -311,11 +308,11 @@ public class TransportClient implements ITransportClient {
int retryCount = 0;
while (true) {
retryCount++;
- if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
throw new SyncConnectionException(
String.format(
"Can not sync file %s after %s tries.",
- file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+ file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
}
try {
status =
@@ -391,11 +388,11 @@ public class TransportClient implements ITransportClient {
while (true) {
retryCount++;
- if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
throw new SyncConnectionException(
String.format(
"Can not sync file %s after %s tries.",
- file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+ file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
}
try {
status =
@@ -425,10 +422,10 @@ public class TransportClient implements ITransportClient {
while (true) {
retryCount++;
- if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
throw new SyncConnectionException(
String.format(
- "Can not sync pipe data after %s tries.", config.getMaxNumOfSyncFileRetry()));
+ "Can not sync pipe data after %s tries.", config.getMaxNumberOfSyncFileRetry()));
}
try {
@@ -510,7 +507,7 @@ public class TransportClient implements ITransportClient {
int retryCount = 0;
while (true) {
retryCount++;
- if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
throw new SyncConnectionException(
String.format(
"%s request connects to receiver %s:%d error.",
@@ -520,7 +517,7 @@ public class TransportClient implements ITransportClient {
try (TTransport heartbeatTransport =
RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS)) {
TProtocol protocol;
- if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+ if (config.isRpcThriftCompressionEnable()) {
protocol = new TCompactProtocol(heartbeatTransport);
} else {
protocol = new TBinaryProtocol(heartbeatTransport);
@@ -536,7 +533,7 @@ public class TransportClient implements ITransportClient {
logger.info(
String.format(
"Heartbeat connect to receiver %s:%d error, retry %d/%d.",
- ipAddress, port, retryCount, config.getMaxNumOfSyncFileRetry()));
+ ipAddress, port, retryCount, config.getMaxNumberOfSyncFileRetry()));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java
deleted file mode 100644
index 9d5b57e..0000000
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.newsync.transport.conf;
-
-import org.apache.iotdb.db.sync.conf.SyncConstant;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TransportSenderConfig {
-
- private String serverIp = "127.0.0.1";
-
- private int serverPort = 5555;
-
- private int syncPeriodInSecond = 600;
-
- private String senderFolderPath;
-
- private String lastFileInfoPath;
-
- private String snapshotPath;
-
- /** The maximum number of retry when syncing a file to receiver fails. */
- private int maxNumOfSyncFileRetry = 5;
-
- /** Storage groups which participate in sync process */
- private List<String> storageGroupList = new ArrayList<>();
-
- /** Update paths based on data directory */
- public void update(String dataDirectory) {
- senderFolderPath =
- dataDirectory
- + File.separatorChar
- + SyncConstant.SYNC_SENDER
- + File.separatorChar
- + getSyncReceiverName();
- lastFileInfoPath = senderFolderPath + File.separatorChar + SyncConstant.LAST_LOCAL_FILE_NAME;
- snapshotPath = senderFolderPath + File.separatorChar + SyncConstant.DATA_SNAPSHOT_NAME;
- if (!new File(snapshotPath).exists()) {
- new File(snapshotPath).mkdirs();
- }
- }
-
- public String getServerIp() {
- return serverIp;
- }
-
- public void setServerIp(String serverIp) {
- this.serverIp = serverIp;
- }
-
- public int getServerPort() {
- return serverPort;
- }
-
- public void setServerPort(int serverPort) {
- this.serverPort = serverPort;
- }
-
- public int getSyncPeriodInSecond() {
- return syncPeriodInSecond;
- }
-
- public void setSyncPeriodInSecond(int syncPeriodInSecond) {
- this.syncPeriodInSecond = syncPeriodInSecond;
- }
-
- public String getSenderFolderPath() {
- return senderFolderPath;
- }
-
- public void setSenderFolderPath(String senderFolderPath) {
- this.senderFolderPath = senderFolderPath;
- }
-
- public String getLastFileInfoPath() {
- return lastFileInfoPath;
- }
-
- public void setLastFileInfoPath(String lastFileInfoPath) {
- this.lastFileInfoPath = lastFileInfoPath;
- }
-
- public String getSnapshotPath() {
- return snapshotPath;
- }
-
- public void setSnapshotPath(String snapshotPath) {
- this.snapshotPath = snapshotPath;
- }
-
- public String getSyncReceiverName() {
- return serverIp + SyncConstant.SYNC_DIR_NAME_SEPARATOR + serverPort;
- }
-
- public List<String> getStorageGroupList() {
- return new ArrayList<>(storageGroupList);
- }
-
- public void setStorageGroupList(List<String> storageGroupList) {
- this.storageGroupList = storageGroupList;
- }
-
- public int getMaxNumOfSyncFileRetry() {
- return maxNumOfSyncFileRetry;
- }
-
- public void setMaxNumOfSyncFileRetry(int maxNumOfSyncFileRetry) {
- this.maxNumOfSyncFileRetry = maxNumOfSyncFileRetry;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
index 6d00a82..8db7c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.service.thrift.ThriftService;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
@@ -84,7 +83,7 @@ public class TransportServerManager extends ThriftService
getID().getName(),
ThreadName.SYNC_CLIENT.getName(),
config.getRpcAddress(),
- SyncConstant.DEFAULT_PIPE_SINK_PORT,
+ config.getPipeServerPort(),
Integer.MAX_VALUE,
config.getThriftServerAwaitTimeForStopService(),
new TransportServerThriftHandler(serviceImpl),
@@ -101,7 +100,7 @@ public class TransportServerManager extends ThriftService
@Override
public int getBindPort() {
// TODO: Whether to change this config here
- return SyncConstant.DEFAULT_PIPE_SINK_PORT;
+ return IoTDBDescriptor.getInstance().getConfig().getPipeServerPort();
}
// @Override
@@ -111,19 +110,12 @@ public class TransportServerManager extends ThriftService
@Override
public void startService() throws StartupException {
- // TODO: Whether to change this config here
- // if (!IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
- // return;
- // }
super.startService();
}
@Override
public void stopService() {
- // TODO: Whether to change this config here
- // if (IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
super.stopService();
- // }
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
index e4998a0..6230eb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.newsync.transport.server;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
@@ -46,6 +47,7 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.text.DecimalFormat;
import java.util.Arrays;
import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.CONFLICT_CODE;
@@ -126,7 +128,12 @@ public class TransportServiceImpl implements TransportService.Iface {
@Override
public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
-
+ // check ip address
+ if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
+ return new TransportStatus(
+ ERROR_CODE,
+ "Sender IP is not in the white list of receiver IP and synchronization tasks are not allowed.");
+ }
// Version check
if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
return new TransportStatus(
@@ -142,6 +149,46 @@ public class TransportServiceImpl implements TransportService.Iface {
return new TransportStatus(SUCCESS_CODE, "");
}
+ /**
+ * Verify IP address with IP white list which contains more than one IP segment. It's used by sync
+ * sender.
+ */
+ private boolean verifyIPSegment(String ipWhiteList, String ipAddress) {
+ String[] ipSegments = ipWhiteList.split(",");
+ for (String IPsegment : ipSegments) {
+ int subnetMask = Integer.parseInt(IPsegment.substring(IPsegment.indexOf('/') + 1));
+ IPsegment = IPsegment.substring(0, IPsegment.indexOf('/'));
+ if (verifyIP(IPsegment, ipAddress, subnetMask)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Verify IP address with IP segment. */
+ private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) {
+ String ipSegmentBinary;
+ String ipAddressBinary;
+ String[] ipSplits = ipSegment.split(SyncConstant.IP_SEPARATOR);
+ DecimalFormat df = new DecimalFormat("00000000");
+ StringBuilder ipSegmentBuilder = new StringBuilder();
+ for (String IPsplit : ipSplits) {
+ ipSegmentBuilder.append(
+ df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
+ }
+ ipSegmentBinary = ipSegmentBuilder.toString();
+ ipSegmentBinary = ipSegmentBinary.substring(0, subnetMark);
+ ipSplits = ipAddress.split(SyncConstant.IP_SEPARATOR);
+ StringBuilder ipAddressBuilder = new StringBuilder();
+ for (String IPsplit : ipSplits) {
+ ipAddressBuilder.append(
+ df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
+ }
+ ipAddressBinary = ipAddressBuilder.toString();
+ ipAddressBinary = ipAddressBinary.substring(0, subnetMark);
+ return ipAddressBinary.equals(ipSegmentBinary);
+ }
+
@Override
public TransportStatus transportData(
IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java
index 298316e..b4ef7d1 100644
--- a/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/transport/TransportServiceTest.java
@@ -19,11 +19,11 @@
*/
package org.apache.iotdb.db.newsync.transport;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
@@ -127,7 +127,8 @@ public class TransportServiceTest {
// 4. start client
Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
TransportClient client =
- new TransportClient(pipe, "127.0.0.1", SyncConstant.DEFAULT_PIPE_SINK_PORT);
+ new TransportClient(
+ pipe, "127.0.0.1", IoTDBDescriptor.getInstance().getConfig().getPipeServerPort());
client.handshake();
for (PipeData pipeData : pipeDataList) {
client.senderTransport(pipeData);