You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/15 10:53:00 UTC

[GitHub] [iotdb] Cpaulyz commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Cpaulyz commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r945588373


##########
example/ext-pipe-plugin-example/src/main/java/org/apache/iotdb/extpipe/ExtPipeSinkWriterImpl.java:
##########
@@ -49,88 +49,85 @@ public void open() {
   }
 
   @Override
-  public synchronized void insertBoolean(String[] path, long timestamp, boolean value)
+  public void insertBoolean(String sgName, String[] path, long timestamp, boolean value)
       throws IOException {
     //== Here, handle inserted Boolean type data from IoTDB.
     //extSession.insertBoolean(...);
     //...
   }
 
   @Override
-  public synchronized void insertInt32(String[] path, long timestamp, int value)
+  public void insertInt32(String sgName, String[] path, long timestamp, int value)
       throws IOException {
     //== Here, handle inserted Int32 type data from IoTDB.
     //extSession.insertInt32(...);
     //...
   }
 
   @Override
-  public synchronized void insertInt64(String[] path, long timestamp, long value)
-      throws IOException {
+  public void insertInt64(String sgName, String[] path, long time, long value) throws IOException {
     //== Here, handle inserted Int64 type data from IoTDB.
     //...
   }
 
   @Override
-  public synchronized void insertFloat(String[] path, long timestamp, float value)
-      throws IOException {
+  public void insertFloat(String sgName, String[] path, long time, float value) throws IOException {
     //== Here, handle inserted float type data from IoTDB.
     //extSession.insertFloat(...);
     //...
   }
 
   @Override
-  public synchronized void insertDouble(String[] path, long timestamp, double value)
-      throws IOException {
+  public void insertDouble(String sgName, String[] path, long time, double value) throws IOException {
     //== Here, handle inserted double type data from IoTDB.
     //extSession.insertDouble(...);
     //...
   }
 
   @Override
-  public synchronized void insertText(String[] path, long timestamp, String value)
-      throws IOException {
+  public void insertText(String sgName, String[] path, long time, String value) throws IOException {
     //== Here, handle inserted Text type data from IoTDB.
     //extSession.insertText(...);
     //..
   }
 
-  @Override
-  public synchronized void insertVector(String[] path, DataType[] dataTypes, long timestamp,
-                                        Object[] values)
-      throws IOException {
-    //== Here, handle inserted Vector type data from IoTDB.
-    //extSession.insertVector(...);
-    //...
-  }
+//  @Override
+//  public void insertVector(String[] path, DataType[] dataTypes, long timestamp,
+//                                        Object[] values)
+//      throws IOException {
+//    //== Here, handle inserted Vector type data from IoTDB.
+//    //extSession.insertVector(...);
+//    //...
+//  }

Review Comment:
   Uncomment



##########
example/ext-pipe-plugin-example/pom.xml:
##########
@@ -21,16 +21,18 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.iotdb</groupId>
     <artifactId>iotdb-ext-pipe-example</artifactId>
+    <version>0.14.0-SNAPSHOT</version>

Review Comment:
   We should make it easy for users to package and register this example. Some changed is needed:
   1. Change name to ext-pipe-example
   2. Add to modules list in parent pom.xml
   3. `mvn spotless:apply`
   4. Add the necessary files in resources/META-INFO/services to enabled SPI.
   ```suggestion
   
       <parent>
           <groupId>org.apache.iotdb</groupId>
           <artifactId>iotdb-examples</artifactId>
           <version>0.14.0-SNAPSHOT</version>
           <relativePath>../pom.xml</relativePath>
       </parent>
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/operation/Operation.java:
##########
@@ -19,22 +19,37 @@
 
 package org.apache.iotdb.db.sync.externalpipe.operation;
 
-/**
- * Operations represents changes of the server's state, including metadata changes and data changes.
- */
-public class Operation {
+/** Operation represents the data changes of the server. */
+public abstract class Operation {
+  public enum OperationType {
+    INSERT,
+    DELETE;
+  }
+
+  private OperationType operationType;
+
   private final String storageGroup;
   // The data rang is [startIndex, endIndex),
   // i.e. valid data include startIndex and not include endIndex
   private long startIndex;
   private long endIndex;
 
-  public Operation(String storageGroup, long startIndex, long endIndex) {
+  public Operation(
+      OperationType operationType, String storageGroup, long startIndex, long endIndex) {
+    this.operationType = operationType;
     this.storageGroup = storageGroup;
     this.startIndex = startIndex;
     this.endIndex = endIndex;
   }
 
+  public OperationType getOperationType() {
+    return operationType;
+  }
+
+  public String getOperationTypeStr() {

Review Comment:
   ```suggestion
     public String getOperationTypeName() {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -176,46 +194,69 @@ private void monitorPipeData() {
       return;
     }
 
-    while (true) {
-      List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
-      if ((pipeDataList != null)
-          && (!pipeDataList.isEmpty())
-          && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
-              > lastPipeDataSerialNumber)) {
-        for (PipeData pipeData : pipeDataList) {
-          long pipeDataSerialNumber = pipeData.getSerialNumber();
-          if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
-            continue;
-          }
-          lastPipeDataSerialNumber = pipeData.getSerialNumber();
+    while (alive) {
+      try {
+        // == pull Pipe src data and insert them to pipeOpManager
+        List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
+        if ((pipeDataList != null)
+            && (!pipeDataList.isEmpty())
+            && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
+                > lastPipeDataSerialNumber)) {
+          for (PipeData pipeData : pipeDataList) {
+            long pipeDataSerialNumber = pipeData.getSerialNumber();
+            if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
+              continue;
+            }
 
-          // extract the Tsfile PipeData
-          if (pipeData instanceof TsFilePipeData) {
-            TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+            // == extract the Tsfile PipeData
+            if (pipeData instanceof TsFilePipeData) {
+              TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+
+              String sgName = tsFilePipeData.getStorageGroupName();

Review Comment:
   Delete the storageGroupName field in TsFilePipe, you can use `StorageEngine.getSgByEngineFile()` instead



##########
server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java:
##########
@@ -35,14 +36,20 @@
 public class DeletionPipeData extends PipeData {
   private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
 
+  private String storageGroup;

Review Comment:
   This attribute is only used in the ext-pipe sender and is an unnecessary field in the data transfer. Adding the field will cause additional network overhead and is not recommended.



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -176,46 +194,69 @@ private void monitorPipeData() {
       return;
     }
 
-    while (true) {
-      List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
-      if ((pipeDataList != null)
-          && (!pipeDataList.isEmpty())
-          && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
-              > lastPipeDataSerialNumber)) {
-        for (PipeData pipeData : pipeDataList) {
-          long pipeDataSerialNumber = pipeData.getSerialNumber();
-          if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
-            continue;
-          }
-          lastPipeDataSerialNumber = pipeData.getSerialNumber();
+    while (alive) {
+      try {
+        // == pull Pipe src data and insert them to pipeOpManager
+        List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
+        if ((pipeDataList != null)
+            && (!pipeDataList.isEmpty())
+            && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
+                > lastPipeDataSerialNumber)) {
+          for (PipeData pipeData : pipeDataList) {
+            long pipeDataSerialNumber = pipeData.getSerialNumber();
+            if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
+              continue;
+            }
 
-          // extract the Tsfile PipeData
-          if (pipeData instanceof TsFilePipeData) {
-            TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+            // == extract the Tsfile PipeData
+            if (pipeData instanceof TsFilePipeData) {
+              TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+
+              String sgName = tsFilePipeData.getStorageGroupName();
+              String tsFileFullName = tsFilePipeData.getTsFilePath();
+              String modsFileFullName = tsFilePipeData.getModsFilePath();
+              try {
+                pipeOpManager.appendTsFileOpBlock(
+                    sgName, tsFileFullName, modsFileFullName, pipeDataSerialNumber);
+                lastPipeDataSerialNumber = pipeDataSerialNumber;
+              } catch (IOException e) {
+                logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFileFullName);
+              }
+              continue;
+            }
 
-            String sgName = tsFilePipeData.getStorageGroupName();
-            String tsFileFullName = tsFilePipeData.getTsFilePath();
-            String modsFileFullName = tsFilePipeData.getModsFilePath();
-            try {
-              pipeOpManager.appendTsFile(
-                  sgName, tsFileFullName, modsFileFullName, pipeDataSerialNumber);
-            } catch (IOException e) {
-              logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFileFullName);
+            // == handle delete PipeData
+            if (pipeData instanceof DeletionPipeData) {

Review Comment:
   ```suggestion
               else if (pipeData instanceof DeletionPipeData) {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -54,32 +55,22 @@ public class ExtPipePluginManager {
   private Map<String, ExtPipePlugin> extPipePluginMap = new HashMap<>();
 
   private ExecutorService monitorService = Executors.newFixedThreadPool(1);

Review Comment:
   Use `IoTDBThreadPoolFactory` to create ExecutorService in ExtPipePluginManager and ExtPipePlugin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org