You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/02/17 08:22:59 UTC

[GitHub] [iotdb] yschengzi commented on a change in pull request #4989: [To new_sync][IOTDB-2272] implement customized sync process: collector

yschengzi commented on a change in pull request #4989:
URL: https://github.com/apache/iotdb/pull/4989#discussion_r808781577



##########
File path: server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
##########
@@ -19,4 +19,155 @@
  */
 package org.apache.iotdb.db.newsync.receiver.collector;
 
-public class Collector {}
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogAnalyzer;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+
+/** scan sync receiver folder and load pipeData into IoTDB */
+public class Collector {
+
+  private static final Logger logger = LoggerFactory.getLogger(Collector.class);
+  // TODO: multi thread for multi pipe
+  private ExecutorService executorService;
+  private ScanTask task;
+
+  public Collector() {
+    this.executorService =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
+    this.task = new ScanTask();
+  }
+
+  public void startCollect() {
+    task.start();
+    executorService =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
+    executorService.submit(task);
+  }
+
+  public void stopCollect() {
+    task.stop();
+    executorService.shutdown();
+  }
+
+  public void startPipe(String pipeName, String remoteIp, long createTime) {
+    task.addScanDir(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
+  }
+
+  public void stopPipe(String pipeName, String remoteIp, long createTime) {
+    task.removeScanDir(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
+  }
+
+  private class ScanTask implements Runnable {
+    private final Set<String> scanPathSet;

Review comment:
       concurrent error risk




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org