You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/08 09:19:52 UTC

[GitHub] [iotdb] qiaojialin commented on a diff in pull request #6817: [IOTDB-3191][IOTDB-3192] Refactoring sync module to use AbstractSyncInfo to manage pipe metadata

qiaojialin commented on code in PR #6817:
URL: https://github.com/apache/iotdb/pull/6817#discussion_r939964011


##########
server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java:
##########
@@ -60,12 +56,12 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.iotdb.db.sync.conf.SyncConstant.DATA_CHUNK_SIZE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.CONFLICT_CODE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.ERROR_CODE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.REBASE_CODE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.RETRY_CODE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.SUCCESS_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.CONFLICT_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
+import static org.apache.iotdb.commons.sync.SyncConstant.ERROR_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.REBASE_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.RETRY_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
 
 public class TransportServiceImpl implements TransportService.Iface {

Review Comment:
   Use InternalService later.



##########
server/src/main/java/org/apache/iotdb/db/sync/common/AbstractSyncInfo.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.common.persistence.SyncLogAnalyzer;
+import org.apache.iotdb.db.sync.common.persistence.SyncLogger;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractSyncInfo {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSyncInfo.class);
+
+  protected boolean pipeServerEnable;
+  // <pipeFolderName, pipeMsg>
+  protected Map<String, List<PipeMessage>> pipeMessageMap;
+
+  private Map<String, PipeSink> pipeSinks;
+
+  private PipeInfo runningPipe;
+  private List<PipeInfo> pipes;
+
+  protected SyncLogger syncLogger;
+
+  public AbstractSyncInfo() {
+    syncLogger = SyncLogger.getInstance();
+    SyncLogAnalyzer analyzer = new SyncLogAnalyzer();
+    try {
+      analyzer.recover();
+      pipeSinks = analyzer.getAllPipeSinks();
+      pipes = analyzer.getAllPipeInfos();
+      runningPipe = analyzer.getRunningPipeInfo();
+      pipeServerEnable = analyzer.isPipeServerEnable();
+      pipeMessageMap = analyzer.getPipeMessageMap();
+    } catch (StartupException e) {
+      LOGGER.error(
+          "Cannot recover ReceiverInfo because {}. Use default info values.", e.getMessage());
+      pipeSinks = new ConcurrentHashMap<>();
+      pipes = new ArrayList<>();
+      pipeMessageMap = new ConcurrentHashMap<>();
+      pipeServerEnable = false;
+    }
+  }
+
+  protected abstract void afterStartPipe(String pipeName, long createTime);
+
+  protected abstract void afterStopPipe(String pipeName, long createTime);
+
+  protected abstract void afterDropPipe(String pipeName, long createTime);
+
+  public void close() throws IOException {
+    syncLogger.close();
+  }
+
+  // region Implement of PipeServer
+
+  public void startServer() throws IOException {
+    pipeServerEnable = true;
+    syncLogger.startPipeServer();
+  }
+
+  public void stopServer() throws IOException {
+    pipeServerEnable = false;
+    syncLogger.stopPipeServer();
+  }
+
+  // endregion
+
+  // region Implement of PipeSink
+  private boolean isPipeSinkExist(String name) {
+    return pipeSinks.containsKey(name);
+  }
+
+  public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException, IOException {
+    if (isPipeSinkExist(plan.getPipeSinkName())) {
+      throw new PipeSinkException(
+          "There is a pipeSink named " + plan.getPipeSinkName() + " in IoTDB, please drop it.");
+    }
+
+    PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
+    // should guarantee the adding pipesink is not exist.
+    pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
+    syncLogger.addPipeSink(plan);
+  }
+
+  public void dropPipeSink(String name) throws PipeSinkException, IOException {
+    if (!isPipeSinkExist(name)) {
+      throw new PipeSinkException("PipeSink " + name + " is not exist.");
+    }
+    if (runningPipe != null
+        && runningPipe.getStatus() != Pipe.PipeStatus.DROP
+        && runningPipe.getPipeSinkName().equals(name)) {
+      throw new PipeSinkException(
+          String.format(
+              "Can not drop pipeSink %s, because pipe %s is using it.",
+              name, runningPipe.getPipeName()));
+    }
+    pipeSinks.remove(name);
+    syncLogger.dropPipeSink(name);
+  }
+
+  public PipeSink getPipeSink(String name) {
+    return pipeSinks.getOrDefault(name, null);
+  }
+
+  public List<PipeSink> getAllPipeSink() {
+    List<PipeSink> allPipeSinks = new ArrayList<>();
+    for (Map.Entry<String, PipeSink> entry : pipeSinks.entrySet()) {
+      allPipeSinks.add(entry.getValue());
+    }
+    return allPipeSinks;
+  }
+
+  // endregion
+
+  // region Implement of Pipe
+
+  public void addPipe(CreatePipePlan plan, long createTime) throws PipeException, IOException {
+    // common check
+    if (runningPipe != null && runningPipe.getStatus() != Pipe.PipeStatus.DROP) {
+      throw new PipeException(
+          String.format(
+              "Pipe %s is %s, please retry after drop it.",
+              runningPipe.getPipeName(), runningPipe.getStatus().name()));
+    }
+    if (!isPipeSinkExist(plan.getPipeSinkName())) {
+      throw new PipeException(String.format("Can not find pipeSink %s.", plan.getPipeSinkName()));
+    }
+
+    PipeSink runningPipeSink = getPipeSink(plan.getPipeSinkName());
+    runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, runningPipeSink, createTime);
+    pipes.add(runningPipe);
+    syncLogger.addPipe(plan, createTime);
+  }
+
+  public void operatePipe(String pipeName, Operator.OperatorType operatorType)
+      throws PipeException, IOException {
+    checkRunningPipeExistAndName(pipeName);
+    switch (operatorType) {
+      case START_PIPE:
+        runningPipe.start();
+        afterStartPipe(runningPipe.getPipeName(), runningPipe.getCreateTime());
+        break;
+      case STOP_PIPE:
+        runningPipe.stop();
+        afterStopPipe(runningPipe.getPipeName(), runningPipe.getCreateTime());
+        break;
+      case DROP_PIPE:
+        runningPipe.drop();
+        afterDropPipe(runningPipe.getPipeName(), runningPipe.getCreateTime());
+        break;
+      default:
+        throw new PipeException("Unknown operatorType " + operatorType);
+    }
+    syncLogger.operatePipe(pipeName, operatorType);
+  }
+
+  public List<PipeInfo> getAllPipeInfos() {
+    return pipes;
+  }
+
+  private void checkRunningPipeExistAndName(String pipeName) throws PipeException {

Review Comment:
   ```suggestion
     private void checkIfPipeExistAndRunning(String pipeName) throws PipeException {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/common/AbstractSyncInfo.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.common.persistence.SyncLogAnalyzer;
+import org.apache.iotdb.db.sync.common.persistence.SyncLogger;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractSyncInfo {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSyncInfo.class);
+
+  protected boolean pipeServerEnable;
+  // <pipeFolderName, pipeMsg>
+  protected Map<String, List<PipeMessage>> pipeMessageMap;
+
+  private Map<String, PipeSink> pipeSinks;
+
+  private PipeInfo runningPipe;
+  private List<PipeInfo> pipes;
+
+  protected SyncLogger syncLogger;
+
+  public AbstractSyncInfo() {
+    syncLogger = SyncLogger.getInstance();
+    SyncLogAnalyzer analyzer = new SyncLogAnalyzer();
+    try {
+      analyzer.recover();
+      pipeSinks = analyzer.getAllPipeSinks();
+      pipes = analyzer.getAllPipeInfos();
+      runningPipe = analyzer.getRunningPipeInfo();
+      pipeServerEnable = analyzer.isPipeServerEnable();
+      pipeMessageMap = analyzer.getPipeMessageMap();
+    } catch (StartupException e) {
+      LOGGER.error(
+          "Cannot recover ReceiverInfo because {}. Use default info values.", e.getMessage());
+      pipeSinks = new ConcurrentHashMap<>();
+      pipes = new ArrayList<>();
+      pipeMessageMap = new ConcurrentHashMap<>();
+      pipeServerEnable = false;
+    }
+  }
+
+  protected abstract void afterStartPipe(String pipeName, long createTime);
+
+  protected abstract void afterStopPipe(String pipeName, long createTime);
+
+  protected abstract void afterDropPipe(String pipeName, long createTime);
+
+  public void close() throws IOException {
+    syncLogger.close();
+  }
+
+  // region Implement of PipeServer
+
+  public void startServer() throws IOException {
+    pipeServerEnable = true;
+    syncLogger.startPipeServer();
+  }
+
+  public void stopServer() throws IOException {
+    pipeServerEnable = false;
+    syncLogger.stopPipeServer();
+  }
+
+  // endregion
+
+  // region Implement of PipeSink
+  private boolean isPipeSinkExist(String name) {
+    return pipeSinks.containsKey(name);
+  }
+
+  public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException, IOException {
+    if (isPipeSinkExist(plan.getPipeSinkName())) {
+      throw new PipeSinkException(
+          "There is a pipeSink named " + plan.getPipeSinkName() + " in IoTDB, please drop it.");
+    }
+
+    PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
+    // should guarantee the adding pipesink is not exist.
+    pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
+    syncLogger.addPipeSink(plan);
+  }
+
+  public void dropPipeSink(String name) throws PipeSinkException, IOException {
+    if (!isPipeSinkExist(name)) {
+      throw new PipeSinkException("PipeSink " + name + " is not exist.");
+    }
+    if (runningPipe != null
+        && runningPipe.getStatus() != Pipe.PipeStatus.DROP
+        && runningPipe.getPipeSinkName().equals(name)) {
+      throw new PipeSinkException(
+          String.format(
+              "Can not drop pipeSink %s, because pipe %s is using it.",
+              name, runningPipe.getPipeName()));
+    }
+    pipeSinks.remove(name);
+    syncLogger.dropPipeSink(name);
+  }
+
+  public PipeSink getPipeSink(String name) {
+    return pipeSinks.getOrDefault(name, null);
+  }
+
+  public List<PipeSink> getAllPipeSink() {
+    List<PipeSink> allPipeSinks = new ArrayList<>();
+    for (Map.Entry<String, PipeSink> entry : pipeSinks.entrySet()) {
+      allPipeSinks.add(entry.getValue());
+    }
+    return allPipeSinks;
+  }
+
+  // endregion
+
+  // region Implement of Pipe
+
+  public void addPipe(CreatePipePlan plan, long createTime) throws PipeException, IOException {
+    // common check
+    if (runningPipe != null && runningPipe.getStatus() != Pipe.PipeStatus.DROP) {
+      throw new PipeException(
+          String.format(
+              "Pipe %s is %s, please retry after drop it.",
+              runningPipe.getPipeName(), runningPipe.getStatus().name()));
+    }
+    if (!isPipeSinkExist(plan.getPipeSinkName())) {
+      throw new PipeException(String.format("Can not find pipeSink %s.", plan.getPipeSinkName()));
+    }
+
+    PipeSink runningPipeSink = getPipeSink(plan.getPipeSinkName());
+    runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, runningPipeSink, createTime);
+    pipes.add(runningPipe);
+    syncLogger.addPipe(plan, createTime);
+  }
+
+  public void operatePipe(String pipeName, Operator.OperatorType operatorType)
+      throws PipeException, IOException {
+    checkRunningPipeExistAndName(pipeName);
+    switch (operatorType) {
+      case START_PIPE:
+        runningPipe.start();
+        afterStartPipe(runningPipe.getPipeName(), runningPipe.getCreateTime());
+        break;
+      case STOP_PIPE:
+        runningPipe.stop();
+        afterStopPipe(runningPipe.getPipeName(), runningPipe.getCreateTime());
+        break;
+      case DROP_PIPE:
+        runningPipe.drop();
+        afterDropPipe(runningPipe.getPipeName(), runningPipe.getCreateTime());
+        break;
+      default:
+        throw new PipeException("Unknown operatorType " + operatorType);
+    }
+    syncLogger.operatePipe(pipeName, operatorType);
+  }
+
+  public List<PipeInfo> getAllPipeInfos() {
+    return pipes;
+  }
+
+  private void checkRunningPipeExistAndName(String pipeName) throws PipeException {
+    if (runningPipe == null || runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
+      throw new PipeException("There is no existing pipe.");
+    }
+    if (!runningPipe.getPipeName().equals(pipeName)) {
+      throw new PipeException(
+          String.format(
+              "Pipe %s is %s, please retry after drop it.",
+              runningPipe.getPipeName(), runningPipe.getStatus()));
+    }
+  }
+
+  // endregion
+
+  /**
+   * write a single message and serialize to disk
+   *
+   * @param pipeName name of pipe
+   * @param createTime createTime of pipe
+   * @param message pipe message
+   */
+  public synchronized void writePipeMessage(String pipeName, long createTime, PipeMessage message) {
+    String pipeIdentifier = SyncPathUtil.getSenderPipeDir(pipeName, createTime);
+    try {
+      syncLogger.writePipeMsg(pipeIdentifier, message);
+    } catch (IOException e) {
+      LOGGER.error(
+          "Can not write pipe message {} from {} to disk because {}",
+          message,
+          pipeIdentifier,
+          e.getMessage());
+    }
+    pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList<>()).add(message);
+  }
+
+  /**
+   * read recent messages about one pipe
+   *
+   * @param pipeName name of pipe
+   * @param createTime createTime of pipe
+   * @param consume if consume is true, these messages will not be deleted. Otherwise, these
+   *     messages can be read next time.
+   * @return recent messages
+   */
+  public synchronized List<PipeMessage> getPipeMessages(
+      String pipeName, long createTime, boolean consume) {
+    List<PipeMessage> pipeMessageList = new ArrayList<>();
+    String pipeIdentifier = SyncPathUtil.getSenderPipeDir(pipeName, createTime);
+    if (consume) {
+      try {
+        syncLogger.comsumePipeMsg(pipeIdentifier);
+      } catch (IOException e) {
+        LOGGER.error(
+            "Can not read pipe message about {} from disk because {}",
+            pipeIdentifier,
+            e.getMessage());
+      }
+    }
+    if (pipeMessageMap.containsKey(pipeIdentifier)) {
+      pipeMessageList = pipeMessageMap.get(pipeIdentifier);
+      if (consume) {
+        pipeMessageMap.remove(pipeIdentifier);
+      }
+    }
+    return pipeMessageList;
+  }
+
+  /**
+   * read the most important message about one pipe. ERROR > WARN > INFO.
+   *
+   * @param pipeName name of pipe
+   * @param createTime createTime of pipe
+   * @param consume if consume is true, recent messages will not be deleted. Otherwise, these
+   *     messages can be read next time.
+   * @return the most important message

Review Comment:
   What is important? last?
   Remove message related.



##########
server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogAnalyzer.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common.persistence;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SyncLogAnalyzer {
+  private static final Logger logger = LoggerFactory.getLogger(SyncLogAnalyzer.class);
+  // record recovery result of receiver server status
+  private boolean pipeServerEnable = false;
+  // <pipeFolderName, pipeMsg>
+  private Map<String, List<PipeMessage>> pipeMessageMap = new ConcurrentHashMap<>();
+  // <pipeSinkName, PipeSink>
+  private Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<>();
+  private List<PipeInfo> pipes = new ArrayList<>();
+  private PipeInfo runningPipe;
+
+  public void recover() throws StartupException {
+    logger.info("Start to recover all sync state for sync.");
+    this.pipeMessageMap = new ConcurrentHashMap<>();
+    this.pipeServerEnable = false;
+    this.pipeSinks = new ConcurrentHashMap<>();
+    this.pipes = new ArrayList<>();
+    File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
+    try (BufferedReader br = new BufferedReader(new FileReader(serviceLogFile))) {
+      recoverPipe(br);
+    } catch (IOException e) {
+      logger.info("Sync service log file not found");

Review Comment:
   ```suggestion
         logger.warn("Sync service log file not found");
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java:
##########
@@ -16,10 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.exception.sync;
+package org.apache.iotdb.db.sync.common;
 
-public class PipeDataLoadBearableException extends PipeDataLoadException {
-  public PipeDataLoadBearableException(String message) {
-    super(message);
-  }
+public class LocalSyncInfo extends AbstractSyncInfo {

Review Comment:
   ```suggestion
   public class SyncInfo extends SyncInfo {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java:
##########
@@ -42,7 +37,6 @@
 import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class TransportHandler {

Review Comment:
   No  more  rpc in  sync, use InternalService



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/StandaloneTransportClient.java:
##########
@@ -59,14 +48,15 @@
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 
+import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
+import static org.apache.iotdb.commons.sync.SyncConstant.REBASE_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.RETRY_CODE;
+import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
 import static org.apache.iotdb.db.sync.transport.conf.TransportConfig.isCheckFileDegistAgain;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.REBASE_CODE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.RETRY_CODE;
-import static org.apache.iotdb.db.sync.transport.conf.TransportConstant.SUCCESS_CODE;
 
-public class TransportClient implements ITransportClient {
+public class StandaloneTransportClient implements ITransportClient {

Review Comment:
   Merge  to  one IoTDBSinkClient



##########
server/src/main/java/org/apache/iotdb/db/sync/common/AbstractSyncInfo.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.exception.sync.PipeSinkException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.common.persistence.SyncLogAnalyzer;
+import org.apache.iotdb.db.sync.common.persistence.SyncLogger;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractSyncInfo {

Review Comment:
   ```suggestion
   public abstract class SyncInfo {
   ```
   
   Separate SyncManager with SyncInfo



##########
server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogAnalyzer.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common.persistence;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SyncLogAnalyzer {

Review Comment:
   ```suggestion
   public class SyncLogReader {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogger.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common.persistence;
+
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+/**
+ * SyncLogger is used to manage the persistent information in the sync module. Persistent
+ * information can be recovered on reboot via {@linkplain SyncLogAnalyzer}.
+ */
+public class SyncLogger {
+  // record pipe meta info
+  private BufferedWriter pipeWriter;

Review Comment:
   ```suggestion
     private BufferedWriter pipeInfoWriter;
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogger.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common.persistence;
+
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+/**
+ * SyncLogger is used to manage the persistent information in the sync module. Persistent
+ * information can be recovered on reboot via {@linkplain SyncLogAnalyzer}.
+ */
+public class SyncLogger {

Review Comment:
   ```suggestion
   public class SyncLogWriter {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org