You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/08 03:31:35 UTC
[iotdb] branch new_sync updated: The transport part of new sync (#5162)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new 510a9b8 The transport part of new sync (#5162)
510a9b8 is described below
commit 510a9b8247d5dc9a9fc3df7dc2cc04ca3b277285
Author: Irvine <ir...@gmail.com>
AuthorDate: Tue Mar 8 11:28:30 2022 +0800
The transport part of new sync (#5162)
Co-authored-by: wangjunqing <wa...@alibaba-inc.com>
---
.../newsync/transport/client/ITransportClient.java | 3 +
.../newsync/transport/client/TransportClient.java | 494 +++++++++++++++++++++
.../db/newsync/transport/conf/TransportConfig.java | 39 ++
.../newsync/transport/conf/TransportConstant.java | 17 +
.../transport/server/TransportServerManager.java | 115 +++++
.../server/TransportServerManagerMBean.java | 15 +
.../server/TransportServerThriftHandler.java | 33 ++
.../transport/server/TransportServiceImpl.java | 286 ++++++++++++
thrift-sync/src/main/thrift/transport.thrift | 73 +++
9 files changed, 1075 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/ITransportClient.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/ITransportClient.java
new file mode 100644
index 0000000..0499fb1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/ITransportClient.java
@@ -0,0 +1,3 @@
+package org.apache.iotdb.db.newsync.transport.client;
+
+public interface ITransportClient {}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
new file mode 100644
index 0000000..037543b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
@@ -0,0 +1,494 @@
+package org.apache.iotdb.db.newsync.transport.client;
+
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
+import org.apache.iotdb.db.newsync.transport.conf.TransportConstant;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.transfer.SyncClient;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.transport.thrift.MetaInfo;
+import org.apache.iotdb.service.transport.thrift.SyncRequest;
+import org.apache.iotdb.service.transport.thrift.SyncResponse;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+import org.apache.iotdb.service.transport.thrift.TransportStatus;
+import org.apache.iotdb.service.transport.thrift.Type;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.UUID;
+
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConfig.isCheckFileDegistAgain;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.REBASE_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.RETRY_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.SUCCESS_CODE;
+
+public class TransportClient implements ITransportClient, Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(SyncClient.class);
+
+ // TODO: Need to change to transport config
+ private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+
+ private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final int TIMEOUT_MS = 2000_000;
+
+ private TTransport transport = null;
+
+ private TransportService.Client serviceClient = null;
+
+ private String ipAddress = null;
+
+ private int port = -1;
+
+ private String uuid = null;
+
+ private IdentityInfo identityInfo = null;
+
+ private Pipe pipe = null;
+
+ @TestOnly
+ private TransportClient() {
+ Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
+ }
+
+ @TestOnly
+ public static TransportClient getInstance() {
+ return TransportClient.InstanceHolder.INSTANCE;
+ }
+
+ @TestOnly
+ public void setServerConfig(String ipAddress, int port) throws IOException {
+ this.ipAddress = ipAddress;
+ this.port = port;
+ this.uuid = getOrCreateUUID(getUuidFile());
+ }
+
+ public TransportClient(Pipe pipe, String ipAddress, int port) throws IOException {
+ Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
+ RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
+
+ this.pipe = pipe;
+ this.ipAddress = ipAddress;
+ this.port = port;
+ this.uuid = getOrCreateUUID(getUuidFile());
+
+ handshake();
+ }
+
+ private boolean handshake() {
+ int handshakeCounter = 0;
+ while (!handshakeWithVersion()) {
+ handshakeCounter++;
+ if (handshakeCounter > config.getMaxNumOfSyncFileRetry()) {
+ logger.error(
+ String.format(
+ "Handshake failed %s times! Check network.", config.getMaxNumOfSyncFileRetry()));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean handshakeWithVersion() {
+
+ if (transport != null && transport.isOpen()) {
+ transport.close();
+ }
+
+ try (Socket socket = new Socket(this.ipAddress, this.port)) {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ config.getServerIp(), config.getServerPort(), TIMEOUT_MS);
+ TProtocol protocol;
+ if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ serviceClient = new TransportService.Client(protocol);
+
+ // Underlay socket open.
+ if (!transport.isOpen()) {
+ transport.open();
+ }
+
+ identityInfo =
+ new IdentityInfo(
+ socket.getLocalAddress().getHostAddress(),
+ this.uuid,
+ ioTDBConfig.getIoTDBMajorVersion());
+ TransportStatus status = serviceClient.handshake(identityInfo);
+ if (status.code != SUCCESS_CODE) {
+ throw new SyncConnectionException(
+ "The receiver rejected the synchronization task because " + status.msg);
+ }
+ } catch (TTransportException e) {
+ logger.error("Cannot connect to the receiver. ", e);
+ // TODO: Do actions with exception.
+ return false;
+ } catch (SyncConnectionException | TException | IOException e) {
+ logger.error("Cannot confirm identity with the receiver. ", e);
+ // TODO: Do actions with exception.
+ return false;
+ }
+ return true;
+ }
+
+ private boolean senderTransport(PipeData pipeData) {
+
+ int retryCount = 0;
+
+ while (true) {
+ retryCount++;
+ if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ logger.error(
+ String.format("After %s tries, stop the transport of current pipeData!", retryCount));
+ return false;
+ }
+
+ try {
+ if (pipeData instanceof TsFilePipeData) {
+ for (File file : ((TsFilePipeData) pipeData).getTsFiles()) {
+ transportSingleFile(file);
+ }
+ }
+ transportPipeData(pipeData);
+ logger.info("Finish current pipeData transport!");
+ break;
+ } catch (SyncConnectionException e) {
+ // handshake and retry
+ handshake();
+ } catch (IOException | NoSuchAlgorithmException e) {
+ logger.error("Transport failed. ", e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** Transfer data of a tsfile to the receiver. */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private void transportSingleFile(File file)
+ throws SyncConnectionException, IOException, NoSuchAlgorithmException {
+
+ MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+
+ while (true) {
+ transportSingleFilePieceByPiece(file, messageDigest);
+
+ if (isCheckFileDegistAgain) {
+ // Check file digest as entirety.
+ messageDigest.reset();
+ try (InputStream inputStream = new FileInputStream(file)) {
+ byte[] block = new byte[TransportConstant.DATA_CHUNK_SIZE];
+ int length;
+ while ((length = inputStream.read(block)) > 0) {
+ messageDigest.update(block, 0, length);
+ }
+ }
+ MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), 0);
+
+ TransportStatus status = null;
+
+ int retryCount = 0;
+
+ while (true) {
+ retryCount++;
+ if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ throw new SyncConnectionException(
+ String.format(
+ "Can not sync file %s after %s tries.",
+ file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+ }
+ try {
+ status =
+ serviceClient.checkFileDigest(
+ identityInfo, metaInfo, ByteBuffer.wrap(messageDigest.digest()));
+ } catch (TException e) {
+ // retry
+ logger.error("TException happens! ", e);
+ continue;
+ }
+ break;
+ }
+
+ if (status.code != SUCCESS_CODE) {
+ logger.error("Digest check of tsfile {} failed, retry", file.getAbsoluteFile());
+ continue;
+ }
+ }
+ break;
+ }
+
+ logger.info("Receiver has received {} successfully.", file.getAbsoluteFile());
+ }
+
+ private void transportSingleFilePieceByPiece(File file, MessageDigest messageDigest)
+ throws SyncConnectionException {
+
+ // Cut the file into pieces to send
+ long position = 0;
+
+ // Try small piece to rebase the file position.
+ byte[] buffer = new byte[1024 * 1024];
+
+ outer:
+ while (true) {
+
+ // Normal piece.
+ if (position != 0L && buffer.length != TransportConstant.DATA_CHUNK_SIZE) {
+ buffer = new byte[TransportConstant.DATA_CHUNK_SIZE];
+ }
+
+ int dataLength;
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream(TransportConstant.DATA_CHUNK_SIZE)) {
+
+ randomAccessFile.seek(position);
+ while ((dataLength = randomAccessFile.read(buffer)) != -1) {
+ messageDigest.reset();
+ byteArrayOutputStream.write(buffer, 0, dataLength);
+ messageDigest.update(buffer, 0, dataLength);
+ ByteBuffer buffToSend = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ byteArrayOutputStream.reset();
+ MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), position);
+
+ TransportStatus status = null;
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ throw new SyncConnectionException(
+ String.format(
+ "Can not sync file %s after %s tries.",
+ file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+ }
+ try {
+ status =
+ serviceClient.transportData(
+ identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+ } catch (TException e) {
+ // retry
+ logger.error("TException happened! ", e);
+ continue;
+ }
+ break;
+ }
+
+ if (status.code == REBASE_CODE) {
+ position = Long.parseLong(status.msg);
+ continue outer;
+ } else if (status.code == RETRY_CODE) {
+ logger.info(
+ "Receiver failed to receive data from {} because {}, retry.",
+ file.getAbsoluteFile(),
+ status.msg);
+ continue outer;
+ } else if (status.code != SUCCESS_CODE) {
+ logger.info(
+ "Receiver failed to receive data from {} because {}, abort.",
+ file.getAbsoluteFile(),
+ status.msg);
+ throw new RuntimeException("Error! Replace this exception!");
+ } else { // Success
+ position += dataLength;
+ }
+ }
+ } catch (IOException e) {
+ // retry
+ logger.error("IOException happened! ", e);
+ } catch (SyncConnectionException e) {
+ logger.error("Cannot sync data with receiver. ", e);
+ throw e;
+ }
+ }
+ }
+
+ private void transportPipeData(PipeData pipeData)
+ throws SyncConnectionException, NoSuchAlgorithmException {
+
+ MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+
+ int retryCount = 0;
+ while (true) {
+
+ retryCount++;
+ if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+ throw new SyncConnectionException(
+ String.format(
+ "Can not sync pipe data after %s tries.", config.getMaxNumOfSyncFileRetry()));
+ }
+
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ int dataLength = new Long(pipeData.serialize(dataOutputStream)).intValue();
+ byte[] buffer = new byte[dataLength];
+
+ byteArrayOutputStream.write(buffer, 0, dataLength);
+ messageDigest.reset();
+ messageDigest.update(buffer, 0, dataLength);
+ ByteBuffer buffToSend = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ byteArrayOutputStream.reset();
+
+ MetaInfo metaInfo =
+ new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0);
+ TransportStatus status =
+ serviceClient.transportData(
+ identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+
+ if (status.code == SUCCESS_CODE) {
+ break;
+ } else {
+ logger.error("Digest check of pipeData failed, retry");
+ }
+ } catch (IOException | TException e) {
+ // retry
+ logger.error("Exception happened!", e);
+ }
+ }
+ }
+
+ /** UUID marks the identity of sender for receiver. */
+ private String getOrCreateUUID(File uuidFile) throws IOException {
+ if (!uuidFile.getParentFile().exists()) {
+ uuidFile.getParentFile().mkdirs();
+ }
+
+ String uuid;
+ if (uuidFile.exists()) {
+ try (BufferedReader bf = new BufferedReader((new FileReader(uuidFile)))) {
+ uuid = bf.readLine();
+ } catch (IOException e) {
+ logger.error("Cannot read UUID from file {}", uuidFile.getPath());
+ throw new IOException(e);
+ }
+
+ if ((uuid == null) || (uuid.length() == 0)) {
+ logger.warn("UUID in file {} is empty.", uuidFile.getPath());
+ uuidFile.delete();
+ } else {
+ return uuid;
+ }
+ }
+
+ // uuidFile not exist or uuid in uuidFile is invalid
+ try (FileOutputStream out = new FileOutputStream(uuidFile)) {
+ uuid = generateUUID();
+ out.write(uuid.getBytes());
+ } catch (IOException e) {
+ logger.error("Cannot insert UUID to file {}", uuidFile.getPath());
+ throw new IOException(e);
+ }
+
+ return uuid;
+ }
+
+ private String generateUUID() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
+ private File getUuidFile() {
+ return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME);
+ }
+
+ /**
+ * When an object implementing interface <code>Runnable</code> is used to create a thread,
+ * starting the thread causes the object's <code>run</code> method to be called in that separately
+ * executing thread.
+ *
+ * <p>The general contract of the method <code>run</code> is that it may take any action
+ * whatsoever.
+ *
+ * @see Thread#run()
+ */
+ @Override
+ public void run() {
+
+ // while (true) {
+ // List<PipeData> pipeDataList = this.pipe.pull(Long.MAX_VALUE);
+ // for (PipeData pipeData: pipeDataList) {
+ // boolean sendResult = senderTransport(pipeData);
+ // if (!sendResult) {
+ // // Cannot handle the error.
+ // return;
+ // }
+ // this.pipe.commit(pipeData.getSerialNumber());
+ // }
+ //
+ // try {
+ // Thread.sleep(1000);
+ // } catch (InterruptedException e) {
+ // Thread.currentThread().interrupt();
+ // throw new RuntimeException(e);
+ // }
+ // }
+
+ }
+
+ public SyncResponse heartbeat(SyncRequest syncRequest) throws TException {
+ return serviceClient.heartbeat(identityInfo, syncRequest);
+ }
+
+ @TestOnly
+ public static void main(String[] args) throws IOException {
+
+ TransportClient.getInstance().setServerConfig("127.0.0.1", 5555);
+
+ if (!TransportClient.getInstance().handshake()) {
+ // Deal with the error here.
+ return;
+ }
+
+ // Example 1. Send TSFILE.
+ // List<File> files = new ArrayList<>();
+ // files.add(new File(System.getProperty(IoTDBConstant.IOTDB_HOME) + "/files/test1"));
+ // files.add(new File(System.getProperty(IoTDBConstant.IOTDB_HOME) + "/files/test2"));
+ // files.add(new File(System.getProperty(IoTDBConstant.IOTDB_HOME) + "/files/test3"));
+
+ // if (!TransportClient.getInstance().senderTransport()) {
+ // Deal with the error here.
+ // }
+
+ // Example 2. Send DELETION
+ // TsFilePipeData.Type.DELETION.name();
+
+ // Example 3. Send PHYSICALPLAN
+ // TsFilePipeData.Type.PHYSICALPLAN.name();
+ }
+
+ private static class InstanceHolder {
+ private static final TransportClient INSTANCE = new TransportClient();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConfig.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConfig.java
new file mode 100644
index 0000000..316f6f5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConfig.java
@@ -0,0 +1,39 @@
+package org.apache.iotdb.db.newsync.transport.conf;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+
+import java.io.File;
+
+public class TransportConfig {
+ private TransportConfig() {}
+
+ /** default base dir, stores all IoTDB runtime files */
+ private static final String DEFAULT_BASE_DIR = addHomeDir("data");
+
+ private static String addHomeDir(String dir) {
+ String homeDir = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
+ if (!new File(dir).isAbsolute() && homeDir != null && homeDir.length() > 0) {
+ if (!homeDir.endsWith(File.separator)) {
+ dir = homeDir + File.separatorChar + dir;
+ } else {
+ dir = homeDir + dir;
+ }
+ }
+ return dir;
+ }
+
+ public static String getSyncedDir(String ipAddress, String uuid) {
+ return DEFAULT_BASE_DIR
+ + File.separator
+ + SyncConstant.SYNC_RECEIVER
+ + File.separator
+ + ipAddress
+ + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+ + uuid
+ + File.separator
+ + SyncConstant.RECEIVER_DATA_FOLDER_NAME;
+ }
+
+ public static boolean isCheckFileDegistAgain = false;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConstant.java
new file mode 100644
index 0000000..daf1d0d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConstant.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.db.newsync.transport.conf;
+
+import org.apache.iotdb.rpc.RpcUtils;
+
+public class TransportConstant {
+
+ private TransportConstant() {}
+
+ public static final int DATA_CHUNK_SIZE =
+ Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
+
+ public static final int SUCCESS_CODE = 1;
+ public static final int ERROR_CODE = -1;
+ public static final int REBASE_CODE = -2;
+ public static final int RETRY_CODE = -3;
+ public static final int CONFLICT_CODE = -4;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
new file mode 100644
index 0000000..7f9a540
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
@@ -0,0 +1,115 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransportServerManager extends ThriftService
+ implements Runnable, TransportServerManagerMBean {
+
+ private static final Logger logger = LoggerFactory.getLogger(TransportServerManager.class);
+ private TransportServiceImpl serviceImpl;
+
+ @Override
+ public void run() {
+ TransportServerManager serverManager = new TransportServerManager();
+ try {
+ serverManager.start();
+ } catch (StartupException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static class ServiceManagerHolder {
+ private static final TransportServerManager INSTANCE = new TransportServerManager();
+ }
+
+ public static TransportServerManager getInstance() {
+ return TransportServerManager.ServiceManagerHolder.INSTANCE;
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.SYNC_SERVICE;
+ }
+
+ @Override
+ public ThriftService getImplementation() {
+ return getInstance();
+ }
+
+ @Override
+ public void initTProcessor() {
+ initSyncedServiceImpl(null);
+ serviceImpl = new TransportServiceImpl();
+ processor = new TransportService.Processor<>(serviceImpl);
+ }
+
+ @Override
+ public void initThriftServiceThread() {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread =
+ new ThriftServiceThread(
+ processor,
+ getID().getName(),
+ ThreadName.SYNC_CLIENT.getName(),
+ config.getRpcAddress(),
+ config.getSyncServerPort(),
+ Integer.MAX_VALUE,
+ config.getThriftServerAwaitTimeForStopService(),
+ new TransportServerThriftHandler(serviceImpl),
+ config.isRpcThriftCompressionEnable());
+ thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
+ }
+
+ @Override
+ public String getBindIP() {
+ // TODO: Whether to change this config here
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+ }
+
+ @Override
+ public int getBindPort() {
+ // TODO: Whether to change this config here
+ return IoTDBDescriptor.getInstance().getConfig().getSyncServerPort();
+ }
+
+ // @Override
+ public int getRPCPort() {
+ return getBindPort();
+ }
+
+ @Override
+ public void startService() throws StartupException {
+ // TODO: Whether to change this config here
+ if (!IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
+ return;
+ }
+ super.startService();
+ }
+
+ @Override
+ public void stopService() {
+ // TODO: Whether to change this config here
+ if (IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
+ super.stopService();
+ }
+ }
+
+ @TestOnly
+ public static void main(String[] args) throws TTransportException, StartupException {
+ logger.info("Transport server for testing only.");
+ TransportServerManager serverManager = new TransportServerManager();
+ serverManager.start();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManagerMBean.java
new file mode 100644
index 0000000..fec1947
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManagerMBean.java
@@ -0,0 +1,15 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.iotdb.db.exception.StartupException;
+
+public interface TransportServerManagerMBean {
+ String getRPCServiceStatus();
+
+ int getRPCPort();
+
+ void startService() throws StartupException;
+
+ void restartService() throws StartupException;
+
+ void stopService();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerThriftHandler.java
new file mode 100644
index 0000000..8a3898c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerThriftHandler.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class TransportServerThriftHandler implements TServerEventHandler {
+
+ private TransportServiceImpl serviceImpl;
+
+ TransportServerThriftHandler(TransportServiceImpl serviceImpl) {
+ this.serviceImpl = serviceImpl;
+ }
+
+ @Override
+ public void preServe() {}
+
+ @Override
+ public ServerContext createContext(TProtocol input, TProtocol output) {
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+ // release query resources.
+ serviceImpl.handleClientExit();
+ }
+
+ @Override
+ public void processContext(
+ ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
new file mode 100644
index 0000000..0160cf3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
@@ -0,0 +1,286 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.transport.thrift.MetaInfo;
+import org.apache.iotdb.service.transport.thrift.SyncRequest;
+import org.apache.iotdb.service.transport.thrift.SyncResponse;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+import org.apache.iotdb.service.transport.thrift.TransportStatus;
+import org.apache.iotdb.service.transport.thrift.Type;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConfig.getSyncedDir;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.CONFLICT_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.ERROR_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.REBASE_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.RETRY_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.SUCCESS_CODE;
+import static org.apache.iotdb.db.sync.conf.SyncConstant.DATA_CHUNK_SIZE;
+
+public class TransportServiceImpl implements TransportService.Iface {
+ private static Logger logger = LoggerFactory.getLogger(TransportServiceImpl.class);
+
+ private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private class CheckResult {
+ boolean result;
+ String index;
+
+ public CheckResult(boolean result, String index) {
+ this.result = result;
+ this.index = index;
+ }
+
+ public boolean isResult() {
+ return result;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+ }
+
+ private CheckResult checkStartIndexValid(File file, long startIndex) throws IOException {
+ File recordFile = new File(file.getAbsolutePath() + ".record");
+
+ if (!recordFile.exists() && startIndex != 0) {
+ logger.error(
+ "The start index {} of data sync is not valid. "
+ + "The file {} is not exist and start index should equal to 0).",
+ startIndex,
+ recordFile.getAbsolutePath());
+ return new CheckResult(false, "0");
+ }
+
+ if (recordFile.exists()) {
+ try (InputStream inputStream = new FileInputStream(recordFile);
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ String index = bufferedReader.readLine();
+
+ if ((index == null) || (index.length() == 0)) {
+ if (startIndex != 0) {
+ logger.error(
+ "The start index {} of data sync is not valid. "
+ + "The file {} is not exist and start index is should equal to 0.",
+ startIndex,
+ recordFile.getAbsolutePath());
+ return new CheckResult(false, "0");
+ }
+ }
+
+ if (Long.parseLong(index) != startIndex) {
+ logger.error(
+ "The start index {} of data sync is not valid. "
+ + "The start index of the file {} should equal to {}.",
+ startIndex,
+ recordFile.getAbsolutePath(),
+ index);
+ return new CheckResult(false, index);
+ }
+ }
+ }
+
+ return new CheckResult(true, "0");
+ }
+
+ @Override
+ public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
+ logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
+
+ // Version check
+ if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
+ return new TransportStatus(
+ ERROR_CODE,
+ String.format(
+ "Version mismatch: the sender <%s>, the receiver <%s>",
+ identityInfo.version, config.getIoTDBVersion()));
+ }
+
+ if (!new File(getSyncedDir(identityInfo.getAddress(), identityInfo.getUuid())).exists()) {
+ new File(getSyncedDir(identityInfo.getAddress(), identityInfo.getUuid())).mkdirs();
+ }
+ return new TransportStatus(SUCCESS_CODE, "");
+ }
+
+ @Override
+ public TransportStatus transportData(
+ IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
+ logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
+
+ String ipAddress = identityInfo.address;
+ String uuid = identityInfo.uuid;
+ synchronized (uuid.intern()) {
+ Type type = metaInfo.type;
+ String fileName = metaInfo.fileName;
+ long startIndex = metaInfo.startIndex;
+
+ // Check file start index valid
+ if (type == Type.FILE) {
+ try {
+ CheckResult result =
+ checkStartIndexValid(new File(getSyncedDir(ipAddress, uuid), fileName), startIndex);
+ if (!result.isResult()) {
+ return new TransportStatus(REBASE_CODE, result.getIndex());
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return new TransportStatus(ERROR_CODE, e.getMessage());
+ }
+ }
+
+ // Check buff digest
+ int pos = buff.position();
+ MessageDigest messageDigest = null;
+ try {
+ messageDigest = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ logger.error(e.getMessage());
+ return new TransportStatus(ERROR_CODE, e.getMessage());
+ }
+ messageDigest.update(buff);
+ byte[] digestBytes = new byte[digest.capacity()];
+ digest.get(digestBytes);
+ if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
+ return new TransportStatus(RETRY_CODE, "Data digest check error, retry.");
+ }
+
+ if (type != Type.FILE) {
+
+ buff.position(pos);
+ int length = buff.capacity();
+ byte[] byteArray = new byte[length];
+ buff.get(byteArray);
+ try (InputStream inputStream = new ByteArrayInputStream(byteArray);
+ DataInputStream dataInputStream = new DataInputStream(inputStream)) {
+ PipeData pipeData = PipeData.deserialize(dataInputStream);
+ // Do with file
+ // BufferedPipeDataQueue.offer(pipeData);
+ } catch (IOException | IllegalPathException e) {
+ e.printStackTrace();
+ }
+ } else {
+ // Write buff to {file}.patch
+ buff.position(pos);
+ File file = new File(getSyncedDir(ipAddress, uuid), fileName + ".patch");
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
+ randomAccessFile.seek(startIndex);
+ int length = buff.capacity();
+ byte[] byteArray = new byte[length];
+ buff.get(byteArray);
+ randomAccessFile.write(byteArray);
+ writeRecordFile(
+ new File(getSyncedDir(ipAddress, uuid), fileName + ".record"), startIndex + length);
+ logger.debug(
+ "Sync "
+ + fileName
+ + " start at "
+ + startIndex
+ + " to "
+ + (startIndex + length)
+ + " is done.");
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ e.printStackTrace();
+ return new TransportStatus(ERROR_CODE, e.getMessage());
+ }
+ }
+ }
+ return new TransportStatus(SUCCESS_CODE, "");
+ }
+
+ @Override
+ public TransportStatus checkFileDigest(
+ IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer digest) throws TException {
+ logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
+
+ String ipAddress = identityInfo.getAddress();
+ String uuid = identityInfo.getUuid();
+ synchronized (uuid.intern()) {
+ String fileName = metaInfo.fileName;
+ MessageDigest messageDigest = null;
+ try {
+ messageDigest = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ logger.error(e.getMessage());
+ return new TransportStatus(ERROR_CODE, e.getMessage());
+ }
+
+ try (InputStream inputStream =
+ new FileInputStream(new File(getSyncedDir(ipAddress, uuid), fileName + ".patch"))) {
+ byte[] block = new byte[DATA_CHUNK_SIZE];
+ int length;
+ while ((length = inputStream.read(block)) > 0) {
+ messageDigest.update(block, 0, length);
+ }
+
+ String localDigest = (new BigInteger(1, messageDigest.digest())).toString(16);
+ byte[] digestBytes = new byte[digest.capacity()];
+ digest.get(digestBytes);
+ if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
+ logger.error(
+ "The file {} digest check error. "
+ + "The local digest is {} (should be equal to {}).",
+ fileName,
+ localDigest,
+ digest);
+ new File(getSyncedDir(ipAddress, uuid), fileName + ".record").delete();
+ return new TransportStatus(CONFLICT_CODE, "File digest check error.");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ return new TransportStatus(ERROR_CODE, e.getMessage());
+ }
+
+ return new TransportStatus(SUCCESS_CODE, "");
+ }
+ }
+
+ @Override
+ public SyncResponse heartbeat(IdentityInfo identityInfo, SyncRequest syncRequest)
+ throws TException {
+
+ // return ReceiverService.getInstance().recMsg(syncRequest);
+ return null;
+ }
+
+ private void writeRecordFile(File recordFile, long position) throws IOException {
+ File tmpFile = new File(recordFile.getAbsolutePath() + ".tmp");
+ FileWriter fileWriter = new FileWriter(tmpFile, false);
+ fileWriter.write(String.valueOf(position));
+ fileWriter.close();
+ Files.move(tmpFile.toPath(), recordFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ /**
+ * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
+ */
+ public void handleClientExit() {
+ // TODO: Handle client exit here.
+ // do nothing now
+ }
+}
diff --git a/thrift-sync/src/main/thrift/transport.thrift b/thrift-sync/src/main/thrift/transport.thrift
new file mode 100644
index 0000000..3eccde3
--- /dev/null
+++ b/thrift-sync/src/main/thrift/transport.thrift
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+namespace java org.apache.iotdb.service.transport.thrift
+namespace py iotdb.thrift.transport
+
+struct TransportStatus{
+ 1:required i32 code
+ 2:required string msg
+}
+
+// The sender and receiver need to check some info to confirm validity
+struct IdentityInfo{
+ // Check whether the ip of sender is in the white list of receiver.
+ 1:required string address
+
+ // Sender needs to tell receiver its identity.
+ 2:required string uuid
+
+ // The version of sender and receiver need to be the same.
+ 3:required string version
+}
+
+enum Type {
+ TSFILE,
+ DELETION,
+ PHYSICALPLAN,
+ FILE
+}
+
+struct MetaInfo{
+ // The type of the pipeData in sending.
+ 1:required Type type
+
+ // The name of the file in sending.
+ 2:required string fileName
+
+ // The start index of the file slice in sending.
+ 3:required i64 startIndex
+}
+
+struct SyncRequest{
+ 1:required i32 code
+ 2:required string msg
+}
+
+struct SyncResponse{
+ 1:required i32 code
+ 2:required string msg
+}
+
+service TransportService{
+ TransportStatus handshake(IdentityInfo info);
+ TransportStatus transportData(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary buff, 4:binary digest);
+ TransportStatus checkFileDigest(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary digest);
+ // TransportStatus finishTransportFile(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo)
+ SyncResponse heartbeat(1:IdentityInfo identityInfo, 2:SyncRequest syncRequest)
+}