You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/22 14:35:44 UTC

[GitHub] [iotdb] qiaojialin commented on a diff in pull request #7038: [IOTDB-3950] Transport layer optimization for sync modules

qiaojialin commented on code in PR #7038:
URL: https://github.com/apache/iotdb/pull/7038#discussion_r951491154


##########
server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java:
##########
@@ -427,6 +366,22 @@ private TSyncIdentityInfo getCurrentTSyncIdentityInfo() {
     }
   }
 
+  /**
+   * Get current TSyncIdentityInfo
+   *
+   * @return -1

Review Comment:
   fix  javadoc, return -1 means ?



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java:
##########
@@ -218,157 +203,115 @@ private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) {
     return ipAddressBinary.equals(ipSegmentBinary);
   }
 
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
-      throws TException {
+  /**
+   * Receive {@link PipeData} and load it into IoTDB Engine.
+   *
+   * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link
+   *     TSStatusCode#SUCCESS_STATUS} if load successfully.
+   * @throws TException The connection between the sender and the receiver has not been established
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   */
+  public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+    // step1. check connection
     TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
-
+    logger.debug("Invoke transportPipeData method from client ip = {}", identityInfo.address);
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    TSyncTransportType type = metaInfo.type;
-    String fileName = metaInfo.fileName;
-    long startIndex = metaInfo.startIndex;
 
-    // Check file start index valid
-    if (type == TSyncTransportType.FILE) {
-      try {
-        CheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex);
-        if (!result.isResult()) {
-          return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE, result.getIndex());
-        }
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
-      }
-    }
-
-    // Check buff digest
-    int pos = buff.position();
-    MessageDigest messageDigest = null;
+    // step2. deserialize PipeData
+    PipeData pipeData;
     try {
-      messageDigest = MessageDigest.getInstance("SHA-256");
-    } catch (NoSuchAlgorithmException e) {
-      logger.error(e.getMessage());
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
-    }
-    messageDigest.update(buff);
-    byte[] digestBytes = new byte[digest.capacity()];
-    digest.get(digestBytes);
-    if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_RETRY, "Data digest check error");
-    }
-
-    if (type != TSyncTransportType.FILE) {
-      buff.position(pos);
       int length = buff.capacity();
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
-      try {
-        PipeData pipeData = PipeData.createPipeData(byteArray);
-        if (type == TSyncTransportType.TSFILE) {
-          // Do with file
-          handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
-        }
-        logger.info(
-            "Start load pipeData with serialize number {} and type {},value={}",
-            pipeData.getSerialNumber(),
-            pipeData.getType(),
-            pipeData);
-        pipeData.createLoader().load();
-        logger.info(
-            "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
-      } catch (IOException | IllegalPathException e) {
-        logger.error("Pipe data transport error, {}", e.getMessage());
-        return RpcUtils.getStatus(
-            TSStatusCode.SYNC_FILE_RETRY, "Data digest transport error " + e.getMessage());
-      } catch (PipeDataLoadException e) {
-        logger.error("Fail to load pipeData because {}.", e.getMessage());
-        return RpcUtils.getStatus(
-            TSStatusCode.SYNC_FILE_ERROR, "Fail to load pipeData because " + e.getMessage());
-      }
-    } else {
-      // Write buff to {file}.patch
-      buff.position(pos);
-      File file = new File(fileDir, fileName + PATCH_SUFFIX);
-      try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
-        randomAccessFile.seek(startIndex);
-        int length = buff.capacity();
-        byte[] byteArray = new byte[length];
-        buff.get(byteArray);
-        randomAccessFile.write(byteArray);
-        writeRecordFile(new File(fileDir, fileName + RECORD_SUFFIX), startIndex + length);
-        logger.debug(
-            "Sync "
-                + fileName
-                + " start at "
-                + startIndex
-                + " to "
-                + (startIndex + length)
-                + " is done.");
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+      pipeData = PipeData.createPipeData(byteArray);
+      if (pipeData instanceof TsFilePipeData) {
+        handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
       }
+    } catch (IOException | IllegalPathException e) {
+      logger.error("Pipe data transport error, {}", e.getMessage());
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + e.getMessage());
     }
+
+    // step3. load PipeData
+    logger.info(
+        "Start load pipeData with serialize number {} and type {},value={}",
+        pipeData.getSerialNumber(),
+        pipeData.getType(),
+        pipeData);
+    try {
+      pipeData.createLoader().load();
+      logger.info(
+          "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
+    } catch (PipeDataLoadException e) {
+      logger.error("Fail to load pipeData because {}.", e.getMessage());
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + e.getMessage());
+    }
+
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+  /**
+   * Receive TsFile based on startIndex.
+   *
+   * @return {@link TSStatusCode#SUCCESS_STATUS} if receive successfully; {@link
+   *     TSStatusCode#SYNC_FILE_REBASE} if startIndex needs to rollback because mismatched; {@link
+   *     TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
+   * @throws TException The connection between the sender and the receiver has not been established
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   */
+  public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
       throws TException {
+    // step1. check connection
     TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
+    logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
+
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    synchronized (fileDir.intern()) {
-      String fileName = metaInfo.fileName;
-      MessageDigest messageDigest = null;
-      try {
-        messageDigest = MessageDigest.getInstance("SHA-256");
-      } catch (NoSuchAlgorithmException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, e.getMessage());
-      }
+    //    TSyncTransportType type = metaInfo.type;

Review Comment:
   remove



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java:
##########
@@ -84,234 +84,137 @@ public IoTDBSinkTransportClient(Pipe pipe, String ipAddress, int port, String lo
     this.ipAddress = ipAddress;
     this.port = port;
     this.localIP = localIP;
-    serviceClient = new ClientWrapper(pipe, ipAddress, port, localIP);
-    heartbeatClient = new ClientWrapper(pipe, ipAddress, port, localIP);
   }
 
   /**
-   * Create thrift connection to receiver. (1) register pipe message, including pipeName, localIp
-   * and createTime (2) check IoTDB version to make sure compatibility
+   * Create thrift connection to receiver. Check IoTDB version to make sure compatibility
    *
-   * @return true if success; false if failed MaxNumberOfSyncFileRetry times.
+   * @return true if success; false if failed to check IoTDB version.
    * @throws SyncConnectionException cannot create connection to receiver
    */
+  @Override
   public synchronized boolean handshake() throws SyncConnectionException {
-    for (int handshakeCounter = 0;
-        handshakeCounter < config.getMaxNumberOfSyncFileRetry();
-        handshakeCounter++) {
-      try {
-        return serviceClient.handshakeWithVersion();
-      } catch (SyncConnectionException e) {
-        logger.warn(
-            String.format(
-                "Handshake error, retry %d/%d.",
-                handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
-      }
+    if (transport != null && transport.isOpen()) {
+      transport.close();
     }
-    if (!serviceClient.handshakeWithVersion()) {
-      logger.info(
-          String.format("Handshake failed %s times!", config.getMaxNumberOfSyncFileRetry()));
-      return false;
-    } else {
-      return true;
-    }
-  }
 
-  public boolean senderTransport(PipeData pipeData) throws SyncConnectionException {
-    if (pipeData instanceof TsFilePipeData) {
-      try {
-        for (File file : ((TsFilePipeData) pipeData).getTsFiles(true)) {
-          transportSingleFile(file);
-        }
-      } catch (IOException e) {
-        logger.error(String.format("Get tsfiles error, because %s.", e), e);
-        return false;
-      } catch (NoSuchAlgorithmException e) {
-        logger.error(String.format("Wrong message digest, because %s.", e), e);
-        return false;
+    try {
+      transport =
+          RpcTransportFactory.INSTANCE.getTransport(
+              new TSocket(
+                  TConfigurationConst.defaultTConfiguration,
+                  ipAddress,
+                  port,
+                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
+      TProtocol protocol;
+      if (config.isRpcThriftCompressionEnable()) {
+        protocol = new TCompactProtocol(transport);
+      } else {
+        protocol = new TBinaryProtocol(transport);
       }
-    }
-
-    int retryCount = 0;
+      serviceClient = new IClientRPCService.Client(protocol);
 
-    while (true) {
-      retryCount++;
-      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-        logger.error(
-            String.format("After %s tries, stop the transport of current pipeData!", retryCount));
-        throw new SyncConnectionException(
-            String.format("Can not connect to receiver when transferring pipedata %s.", pipeData));
+      // Underlay socket open.
+      if (!transport.isOpen()) {
+        transport.open();
       }
 
-      try {
-        transportPipeData(pipeData);
-        logger.info(String.format("Finish pipeData %s transport.", pipeData));
-        break;
-      } catch (SyncConnectionException e) {
-        // handshake and retry
-        try {
-          if (!handshake()) {
-            logger.error(
-                String.format(
-                    "Handshake to receiver %s:%d error when transfer pipe data %s.",
-                    ipAddress, port, pipeData));
-            return false;
-          }
-        } catch (SyncConnectionException syncConnectionException) {
-          logger.error(
-              String.format(
-                  "Reconnect to receiver %s:%d error when transfer pipe data %s.",
-                  ipAddress, port, pipeData));
-          throw new SyncConnectionException(
-              String.format(
-                  "Reconnect to receiver error when transferring pipedata %s.", pipeData));
-        }
-      } catch (NoSuchAlgorithmException e) {
-        logger.error("Transport failed. ", e);
+      TSyncIdentityInfo identityInfo =
+          new TSyncIdentityInfo(
+              localIP, pipe.getName(), pipe.getCreateTime(), config.getIoTDBMajorVersion());
+      TSStatus status = serviceClient.handshake(identityInfo);
+      if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("The receiver rejected the synchronization task because {}", status.message);
         return false;
       }
+    } catch (TException e) {
+      logger.warn("Cannot connect to the receiver because {}", e.getMessage());
+      throw new SyncConnectionException(
+          String.format("Cannot connect to the receiver because %s.", e.getMessage()));
     }
     return true;
   }
 
-  /** Transfer data of a tsfile to the receiver. */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private void transportSingleFile(File file)
-      throws SyncConnectionException, NoSuchAlgorithmException {
-
-    MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
-
-    int retryCount = 0;
-    while (true) {
-      retryCount++;
-      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
-        throw new SyncConnectionException(
-            String.format("Connect to receiver error when transferring file %s.", file.getName()));
-      }
-
+  /**
+   * Send {@link PipeData} to receiver and load. If PipeData is TsFilePipeData, The TsFiles will be
+   * transferred before the PipeData transfer.
+   *
+   * @return true if success; false if failed to send or load.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  @Override
+  public boolean sendTransport(PipeData pipeData) throws SyncConnectionException {

Review Comment:
   ```suggestion
     public boolean sendPipeData(PipeData pipeData) throws SyncConnectionException {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class SenderManager {
+  private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+  private static SenderManager DEBUG_SENDER_MANAGER = null; // test only
+
+  protected ITransportClient transportClient;
+  private Pipe pipe;
+  private PipeSink pipeSink;
+
+  protected ExecutorService transportExecutorService;
+  private Future transportFuture;
+
+  private SenderManager(Pipe pipe, IoTDBPipeSink pipeSink) {
+    this.pipe = pipe;
+    this.pipeSink = pipeSink;
+    this.transportExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
+    this.transportClient = TransportClientFactory.createTransportClient(pipe, pipeSink);
+  }
+
+  public void start() {
+    transportFuture = transportExecutorService.submit(this::takePipeDataAndTransport);
+  }
+
+  public void stop() {
+    if (transportFuture != null) {
+      transportFuture.cancel(true);
+    }
+  }
+
+  public boolean close() throws InterruptedException {
+    boolean isClosed;
+    transportExecutorService.shutdownNow();
+    isClosed =
+        transportExecutorService.awaitTermination(
+            SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS, TimeUnit.MILLISECONDS);
+    return isClosed;
+  }
+
+  public static SenderManager getTransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+    if (DEBUG_SENDER_MANAGER == null) {
+      return new SenderManager(pipe, pipeSink);
+    }
+    DEBUG_SENDER_MANAGER.resetTransportClient(pipe); // test only
+    return DEBUG_SENDER_MANAGER;
+  }
+
+  private void takePipeDataAndTransport() {
+    try {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          if (!transportClient.handshake()) {
+            SyncService.getInstance()
+                .receiveMsg(
+                    PipeMessage.MsgType.ERROR,
+                    String.format("Can not handshake with %s", pipeSink));
+          }
+          while (!Thread.currentThread().isInterrupted()) {
+            PipeData pipeData = pipe.take();
+            if (!transportClient.sendTransport(pipeData)) {
+              logger.error(String.format("Can not transfer pipedata %s, skip it.", pipeData));
+              // can do something.
+              SyncService.getInstance()
+                  .receiveMsg(
+                      PipeMessage.MsgType.WARN,
+                      String.format(
+                          "Transfer piepdata %s error, skip it.", pipeData.getSerialNumber()));
+              continue;
+            }
+            pipe.commit();
+          }
+        } catch (SyncConnectionException e) {
+          logger.error(String.format("Connect to receiver %s error, because %s.", pipeSink, e));
+          // TODO: wait and retry
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.info("Interrupted by pipe, exit transport.");
+    } finally {
+      transportClient.close();
+    }
+  }
+
+  /** test */
+  @TestOnly
+  public void setTransportClient(ITransportClient transportClient) {
+    this.transportClient = transportClient;
+  }
+
+  @TestOnly
+  public static void setDebugSenderManager(SenderManager senderManager) {

Review Comment:
   ```suggestion
     public static void setFakeSenderManager(SenderManager senderManager) {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class SenderManager {
+  private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+  private static SenderManager DEBUG_SENDER_MANAGER = null; // test only

Review Comment:
   Make DEBUG_SENDER_MANAGER to extend SenderManager and change use a faked implementation of SenderManager in test.



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java:
##########
@@ -19,4 +19,25 @@
  */
 package org.apache.iotdb.db.sync.transport.client;
 
-public interface ITransportClient extends Runnable {}
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+
+public interface ITransportClient {
+  /**
+   * Create connection and handshake before sending messages
+   *
+   * @return true if success; false if failed to check IoTDB version.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  boolean handshake() throws SyncConnectionException;
+
+  /**
+   * Send {@link PipeData} to receiver and load.
+   *
+   * @return true if success; false if failed to send or load.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  boolean sendTransport(PipeData pipeData) throws SyncConnectionException;

Review Comment:
   what is sendTransport ? Rename to sendPipeData



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -2113,15 +2113,14 @@ public TSStatus handshake(TSyncIdentityInfo info) throws TException {
   }
 
   @Override
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
-      throws TException {
-    return SyncService.getInstance().transportData(metaInfo, buff, digest);
+  public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+    return SyncService.getInstance().transportPipeData(buff);
   }
 
   @Override
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+  public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)

Review Comment:
   ```suggestion
     public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
   ```



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java:
##########
@@ -2113,15 +2113,14 @@ public TSStatus handshake(TSyncIdentityInfo info) throws TException {
   }
 
   @Override
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
-      throws TException {
-    return SyncService.getInstance().transportData(metaInfo, buff, digest);
+  public TSStatus transportPipeData(ByteBuffer buff) throws TException {

Review Comment:
   ```suggestion
     public TSStatus sendPipeData(ByteBuffer buff) throws TException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org