You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/10 07:08:44 UTC

[GitHub] [iotdb] jamber001 opened a new pull request, #6947: [IOTDB-3465] ext pipe support DeletionPipeData

jamber001 opened a new pull request, #6947:
URL: https://github.com/apache/iotdb/pull/6947

   https://issues.apache.org/jira/browse/IOTDB-3465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953402127


##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -54,32 +55,22 @@ public class ExtPipePluginManager {
   private Map<String, ExtPipePlugin> extPipePluginMap = new HashMap<>();
 
   private ExecutorService monitorService = Executors.newFixedThreadPool(1);

Review Comment:
   Thank comments from @Cpaulyz .
   I have just corrected it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953414492


##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/operation/Operation.java:
##########
@@ -19,22 +19,37 @@
 
 package org.apache.iotdb.db.sync.externalpipe.operation;
 
-/**
- * Operations represents changes of the server's state, including metadata changes and data changes.
- */
-public class Operation {
+/** Operation represents the data changes of the server. */
+public abstract class Operation {
+  public enum OperationType {
+    INSERT,
+    DELETE;
+  }
+
+  private OperationType operationType;
+
   private final String storageGroup;
   // The data rang is [startIndex, endIndex),
   // i.e. valid data include startIndex and not include endIndex
   private long startIndex;
   private long endIndex;
 
-  public Operation(String storageGroup, long startIndex, long endIndex) {
+  public Operation(
+      OperationType operationType, String storageGroup, long startIndex, long endIndex) {
+    this.operationType = operationType;
     this.storageGroup = storageGroup;
     this.startIndex = startIndex;
     this.endIndex = endIndex;
   }
 
+  public OperationType getOperationType() {
+    return operationType;
+  }
+
+  public String getOperationTypeStr() {

Review Comment:
   thank you.  I have changed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953396353


##########
node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java:
##########
@@ -254,60 +254,87 @@ private boolean alterPrefixPathInternal(
   }
 
   /**
-   * Test if this PartialPath matches a full path. This partialPath acts as a full path pattern.
-   * rPath is supposed to be a full timeseries path without wildcards. e.g. "root.sg.device.*"
-   * matches path "root.sg.device.s1" whereas it does not match "root.sg.device" and
-   * "root.sg.vehicle.s1"
+   * Test if current PartialPath matches a full path. Current partialPath acts as a full path
+   * pattern. rPath is supposed to be a full timeseries path without wildcards. e.g.
+   * "root.sg.device.*" matches path "root.sg.device.s1" whereas it does not match "root.sg.device"
+   * and "root.sg.vehicle.s1"
    *
    * @param rPath a plain full path of a timeseries
    * @return true if a successful match, otherwise return false
    */
   public boolean matchFullPath(PartialPath rPath) {
-    return matchFullPath(rPath.getNodes(), 0, 0, false);
+    return matchPath(rPath.getNodes(), 0, 0, false, false);
   }
 
-  private boolean matchFullPath(
-      String[] pathNodes, int pathIndex, int patternIndex, boolean multiLevelWild) {
+  /**
+   * Check if current pattern PartialPath can match 1 prefix path.
+   *
+   * <p>1) Current partialPath acts as a full path pattern.
+   *
+   * <p>2) Input parameter prefixPath is 1 prefix of time-series path.
+   *
+   * <p>For example:
+   *
+   * <p>1) Pattern "root.sg1.d1.*" can matche prefix path "root.sg1.d1.s1", "root.sg1.d1",
+   * "root.sg1", "root" etc.
+   *
+   * <p>1) Pattern "root.sg1.d1.*" does not match prefix path "root.sg2", "root.sg1.d2".
+   *
+   * @param prefixPath
+   * @return
+   */
+  public boolean matchPrefixPath(PartialPath prefixPath) {
+    return matchPath(prefixPath.getNodes(), 0, 0, false, true);
+  }

Review Comment:
   @MarcosZyk  Thank your coments.
   1) I have just corrected syntax mistakes in comment lines.
   2) Yeah, old one has been renamed to prefixMatchFullPath.  
        I just added some comments for this funciotn for easier understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953401705


##########
server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java:
##########
@@ -35,14 +36,20 @@
 public class DeletionPipeData extends PipeData {
   private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
 
+  private String storageGroup;

Review Comment:
   Thank @Cpaulyz 
   This storageGroup is very important field for ext-pipe.
   Because it cant let ext-pipe module and customers' plugins  do data's parallel processing.
   So I have to add this new field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] MarcosZyk commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
MarcosZyk commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r945557774


##########
node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java:
##########
@@ -254,60 +254,87 @@ private boolean alterPrefixPathInternal(
   }
 
   /**
-   * Test if this PartialPath matches a full path. This partialPath acts as a full path pattern.
-   * rPath is supposed to be a full timeseries path without wildcards. e.g. "root.sg.device.*"
-   * matches path "root.sg.device.s1" whereas it does not match "root.sg.device" and
-   * "root.sg.vehicle.s1"
+   * Test if current PartialPath matches a full path. Current partialPath acts as a full path
+   * pattern. rPath is supposed to be a full timeseries path without wildcards. e.g.
+   * "root.sg.device.*" matches path "root.sg.device.s1" whereas it does not match "root.sg.device"
+   * and "root.sg.vehicle.s1"
    *
    * @param rPath a plain full path of a timeseries
    * @return true if a successful match, otherwise return false
    */
   public boolean matchFullPath(PartialPath rPath) {
-    return matchFullPath(rPath.getNodes(), 0, 0, false);
+    return matchPath(rPath.getNodes(), 0, 0, false, false);
   }
 
-  private boolean matchFullPath(
-      String[] pathNodes, int pathIndex, int patternIndex, boolean multiLevelWild) {
+  /**
+   * Check if current pattern PartialPath can match 1 prefix path.
+   *
+   * <p>1) Current partialPath acts as a full path pattern.
+   *
+   * <p>2) Input parameter prefixPath is 1 prefix of time-series path.
+   *
+   * <p>For example:
+   *
+   * <p>1) Pattern "root.sg1.d1.*" can matche prefix path "root.sg1.d1.s1", "root.sg1.d1",
+   * "root.sg1", "root" etc.
+   *
+   * <p>1) Pattern "root.sg1.d1.*" does not match prefix path "root.sg2", "root.sg1.d2".
+   *
+   * @param prefixPath
+   * @return
+   */
+  public boolean matchPrefixPath(PartialPath prefixPath) {
+    return matchPath(prefixPath.getNodes(), 0, 0, false, true);
+  }

Review Comment:
   Please correct some syntax mistakes. 
   
   It seems this interface has different behavior from the old one and the old one has been renamed to ```prefixMatchFullPath```. The names are confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] Cpaulyz commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
Cpaulyz commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r945588373


##########
example/ext-pipe-plugin-example/src/main/java/org/apache/iotdb/extpipe/ExtPipeSinkWriterImpl.java:
##########
@@ -49,88 +49,85 @@ public void open() {
   }
 
   @Override
-  public synchronized void insertBoolean(String[] path, long timestamp, boolean value)
+  public void insertBoolean(String sgName, String[] path, long timestamp, boolean value)
       throws IOException {
     //== Here, handle inserted Boolean type data from IoTDB.
     //extSession.insertBoolean(...);
     //...
   }
 
   @Override
-  public synchronized void insertInt32(String[] path, long timestamp, int value)
+  public void insertInt32(String sgName, String[] path, long timestamp, int value)
       throws IOException {
     //== Here, handle inserted Int32 type data from IoTDB.
     //extSession.insertInt32(...);
     //...
   }
 
   @Override
-  public synchronized void insertInt64(String[] path, long timestamp, long value)
-      throws IOException {
+  public void insertInt64(String sgName, String[] path, long time, long value) throws IOException {
     //== Here, handle inserted Int64 type data from IoTDB.
     //...
   }
 
   @Override
-  public synchronized void insertFloat(String[] path, long timestamp, float value)
-      throws IOException {
+  public void insertFloat(String sgName, String[] path, long time, float value) throws IOException {
     //== Here, handle inserted float type data from IoTDB.
     //extSession.insertFloat(...);
     //...
   }
 
   @Override
-  public synchronized void insertDouble(String[] path, long timestamp, double value)
-      throws IOException {
+  public void insertDouble(String sgName, String[] path, long time, double value) throws IOException {
     //== Here, handle inserted double type data from IoTDB.
     //extSession.insertDouble(...);
     //...
   }
 
   @Override
-  public synchronized void insertText(String[] path, long timestamp, String value)
-      throws IOException {
+  public void insertText(String sgName, String[] path, long time, String value) throws IOException {
     //== Here, handle inserted Text type data from IoTDB.
     //extSession.insertText(...);
     //..
   }
 
-  @Override
-  public synchronized void insertVector(String[] path, DataType[] dataTypes, long timestamp,
-                                        Object[] values)
-      throws IOException {
-    //== Here, handle inserted Vector type data from IoTDB.
-    //extSession.insertVector(...);
-    //...
-  }
+//  @Override
+//  public void insertVector(String[] path, DataType[] dataTypes, long timestamp,
+//                                        Object[] values)
+//      throws IOException {
+//    //== Here, handle inserted Vector type data from IoTDB.
+//    //extSession.insertVector(...);
+//    //...
+//  }

Review Comment:
   Uncomment



##########
example/ext-pipe-plugin-example/pom.xml:
##########
@@ -21,16 +21,18 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.iotdb</groupId>
     <artifactId>iotdb-ext-pipe-example</artifactId>
+    <version>0.14.0-SNAPSHOT</version>

Review Comment:
   We should make it easy for users to package and register this example. Some changed is needed:
   1. Change name to ext-pipe-example
   2. Add to modules list in parent pom.xml
   3. `mvn spotless:apply`
   4. Add the necessary files in resources/META-INFO/services to enabled SPI.
   ```suggestion
   
       <parent>
           <groupId>org.apache.iotdb</groupId>
           <artifactId>iotdb-examples</artifactId>
           <version>0.14.0-SNAPSHOT</version>
           <relativePath>../pom.xml</relativePath>
       </parent>
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/operation/Operation.java:
##########
@@ -19,22 +19,37 @@
 
 package org.apache.iotdb.db.sync.externalpipe.operation;
 
-/**
- * Operations represents changes of the server's state, including metadata changes and data changes.
- */
-public class Operation {
+/** Operation represents the data changes of the server. */
+public abstract class Operation {
+  public enum OperationType {
+    INSERT,
+    DELETE;
+  }
+
+  private OperationType operationType;
+
   private final String storageGroup;
   // The data rang is [startIndex, endIndex),
   // i.e. valid data include startIndex and not include endIndex
   private long startIndex;
   private long endIndex;
 
-  public Operation(String storageGroup, long startIndex, long endIndex) {
+  public Operation(
+      OperationType operationType, String storageGroup, long startIndex, long endIndex) {
+    this.operationType = operationType;
     this.storageGroup = storageGroup;
     this.startIndex = startIndex;
     this.endIndex = endIndex;
   }
 
+  public OperationType getOperationType() {
+    return operationType;
+  }
+
+  public String getOperationTypeStr() {

Review Comment:
   ```suggestion
     public String getOperationTypeName() {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -176,46 +194,69 @@ private void monitorPipeData() {
       return;
     }
 
-    while (true) {
-      List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
-      if ((pipeDataList != null)
-          && (!pipeDataList.isEmpty())
-          && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
-              > lastPipeDataSerialNumber)) {
-        for (PipeData pipeData : pipeDataList) {
-          long pipeDataSerialNumber = pipeData.getSerialNumber();
-          if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
-            continue;
-          }
-          lastPipeDataSerialNumber = pipeData.getSerialNumber();
+    while (alive) {
+      try {
+        // == pull Pipe src data and insert them to pipeOpManager
+        List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
+        if ((pipeDataList != null)
+            && (!pipeDataList.isEmpty())
+            && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
+                > lastPipeDataSerialNumber)) {
+          for (PipeData pipeData : pipeDataList) {
+            long pipeDataSerialNumber = pipeData.getSerialNumber();
+            if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
+              continue;
+            }
 
-          // extract the Tsfile PipeData
-          if (pipeData instanceof TsFilePipeData) {
-            TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+            // == extract the Tsfile PipeData
+            if (pipeData instanceof TsFilePipeData) {
+              TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+
+              String sgName = tsFilePipeData.getStorageGroupName();

Review Comment:
   Delete the storageGroupName field in TsFilePipe, you can use `StorageEngine.getSgByEngineFile()` instead



##########
server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java:
##########
@@ -35,14 +36,20 @@
 public class DeletionPipeData extends PipeData {
   private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
 
+  private String storageGroup;

Review Comment:
   This attribute is only used in the ext-pipe sender and is an unnecessary field in the data transfer. Adding the field will cause additional network overhead and is not recommended.



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -176,46 +194,69 @@ private void monitorPipeData() {
       return;
     }
 
-    while (true) {
-      List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
-      if ((pipeDataList != null)
-          && (!pipeDataList.isEmpty())
-          && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
-              > lastPipeDataSerialNumber)) {
-        for (PipeData pipeData : pipeDataList) {
-          long pipeDataSerialNumber = pipeData.getSerialNumber();
-          if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
-            continue;
-          }
-          lastPipeDataSerialNumber = pipeData.getSerialNumber();
+    while (alive) {
+      try {
+        // == pull Pipe src data and insert them to pipeOpManager
+        List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
+        if ((pipeDataList != null)
+            && (!pipeDataList.isEmpty())
+            && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
+                > lastPipeDataSerialNumber)) {
+          for (PipeData pipeData : pipeDataList) {
+            long pipeDataSerialNumber = pipeData.getSerialNumber();
+            if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
+              continue;
+            }
 
-          // extract the Tsfile PipeData
-          if (pipeData instanceof TsFilePipeData) {
-            TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+            // == extract the Tsfile PipeData
+            if (pipeData instanceof TsFilePipeData) {
+              TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+
+              String sgName = tsFilePipeData.getStorageGroupName();
+              String tsFileFullName = tsFilePipeData.getTsFilePath();
+              String modsFileFullName = tsFilePipeData.getModsFilePath();
+              try {
+                pipeOpManager.appendTsFileOpBlock(
+                    sgName, tsFileFullName, modsFileFullName, pipeDataSerialNumber);
+                lastPipeDataSerialNumber = pipeDataSerialNumber;
+              } catch (IOException e) {
+                logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFileFullName);
+              }
+              continue;
+            }
 
-            String sgName = tsFilePipeData.getStorageGroupName();
-            String tsFileFullName = tsFilePipeData.getTsFilePath();
-            String modsFileFullName = tsFilePipeData.getModsFilePath();
-            try {
-              pipeOpManager.appendTsFile(
-                  sgName, tsFileFullName, modsFileFullName, pipeDataSerialNumber);
-            } catch (IOException e) {
-              logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFileFullName);
+            // == handle delete PipeData
+            if (pipeData instanceof DeletionPipeData) {

Review Comment:
   ```suggestion
               else if (pipeData instanceof DeletionPipeData) {
   ```



##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -54,32 +55,22 @@ public class ExtPipePluginManager {
   private Map<String, ExtPipePlugin> extPipePluginMap = new HashMap<>();
 
   private ExecutorService monitorService = Executors.newFixedThreadPool(1);

Review Comment:
   Use `IoTDBThreadPoolFactory` to create ExecutorService in ExtPipePluginManager and ExtPipePlugin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953465430


##########
example/ext-pipe-plugin-example/pom.xml:
##########
@@ -21,16 +21,18 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.iotdb</groupId>
     <artifactId>iotdb-ext-pipe-example</artifactId>
+    <version>0.14.0-SNAPSHOT</version>

Review Comment:
   Thank you.
   I did below changes
   1) Change artifactId to ext-pipe-example. 
   2) Add the necessary files in resources/META-INFO/services to enabled SPI.
   3) mvn spotless:apply
   
   For left comments, I do not do change.  Because
   1) This plugin dev example codes will not depend on IoTDB project but only depend on external-pipe-api, which is to facilitate customers' development.
   2) We do not want customers to dev plugin using IoTDB project .pom.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] ericpai merged pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
ericpai merged PR #6947:
URL: https://github.com/apache/iotdb/pull/6947


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] yschengzi commented on pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
yschengzi commented on PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#issuecomment-1217981698

   > @MarcosZyk please help review changes about `PartialPath`.
   > 
   > @yschengzi please help review changes about main process about `ExtPipePlugin`/`ExtPipePluginManager`.
   
   LGTM-


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953418462


##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -176,46 +194,69 @@ private void monitorPipeData() {
       return;
     }
 
-    while (true) {
-      List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
-      if ((pipeDataList != null)
-          && (!pipeDataList.isEmpty())
-          && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
-              > lastPipeDataSerialNumber)) {
-        for (PipeData pipeData : pipeDataList) {
-          long pipeDataSerialNumber = pipeData.getSerialNumber();
-          if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
-            continue;
-          }
-          lastPipeDataSerialNumber = pipeData.getSerialNumber();
+    while (alive) {
+      try {
+        // == pull Pipe src data and insert them to pipeOpManager
+        List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
+        if ((pipeDataList != null)
+            && (!pipeDataList.isEmpty())
+            && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
+                > lastPipeDataSerialNumber)) {
+          for (PipeData pipeData : pipeDataList) {
+            long pipeDataSerialNumber = pipeData.getSerialNumber();
+            if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
+              continue;
+            }
 
-          // extract the Tsfile PipeData
-          if (pipeData instanceof TsFilePipeData) {
-            TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+            // == extract the Tsfile PipeData
+            if (pipeData instanceof TsFilePipeData) {
+              TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+
+              String sgName = tsFilePipeData.getStorageGroupName();
+              String tsFileFullName = tsFilePipeData.getTsFilePath();
+              String modsFileFullName = tsFilePipeData.getModsFilePath();
+              try {
+                pipeOpManager.appendTsFileOpBlock(
+                    sgName, tsFileFullName, modsFileFullName, pipeDataSerialNumber);
+                lastPipeDataSerialNumber = pipeDataSerialNumber;
+              } catch (IOException e) {
+                logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFileFullName);
+              }
+              continue;
+            }
 
-            String sgName = tsFilePipeData.getStorageGroupName();
-            String tsFileFullName = tsFilePipeData.getTsFilePath();
-            String modsFileFullName = tsFilePipeData.getModsFilePath();
-            try {
-              pipeOpManager.appendTsFile(
-                  sgName, tsFileFullName, modsFileFullName, pipeDataSerialNumber);
-            } catch (IOException e) {
-              logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFileFullName);
+            // == handle delete PipeData
+            if (pipeData instanceof DeletionPipeData) {

Review Comment:
   I have just changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953397623


##########
example/ext-pipe-plugin-example/src/main/java/org/apache/iotdb/extpipe/ExtPipeSinkWriterImpl.java:
##########
@@ -49,88 +49,85 @@ public void open() {
   }
 
   @Override
-  public synchronized void insertBoolean(String[] path, long timestamp, boolean value)
+  public void insertBoolean(String sgName, String[] path, long timestamp, boolean value)
       throws IOException {
     //== Here, handle inserted Boolean type data from IoTDB.
     //extSession.insertBoolean(...);
     //...
   }
 
   @Override
-  public synchronized void insertInt32(String[] path, long timestamp, int value)
+  public void insertInt32(String sgName, String[] path, long timestamp, int value)
       throws IOException {
     //== Here, handle inserted Int32 type data from IoTDB.
     //extSession.insertInt32(...);
     //...
   }
 
   @Override
-  public synchronized void insertInt64(String[] path, long timestamp, long value)
-      throws IOException {
+  public void insertInt64(String sgName, String[] path, long time, long value) throws IOException {
     //== Here, handle inserted Int64 type data from IoTDB.
     //...
   }
 
   @Override
-  public synchronized void insertFloat(String[] path, long timestamp, float value)
-      throws IOException {
+  public void insertFloat(String sgName, String[] path, long time, float value) throws IOException {
     //== Here, handle inserted float type data from IoTDB.
     //extSession.insertFloat(...);
     //...
   }
 
   @Override
-  public synchronized void insertDouble(String[] path, long timestamp, double value)
-      throws IOException {
+  public void insertDouble(String sgName, String[] path, long time, double value) throws IOException {
     //== Here, handle inserted double type data from IoTDB.
     //extSession.insertDouble(...);
     //...
   }
 
   @Override
-  public synchronized void insertText(String[] path, long timestamp, String value)
-      throws IOException {
+  public void insertText(String sgName, String[] path, long time, String value) throws IOException {
     //== Here, handle inserted Text type data from IoTDB.
     //extSession.insertText(...);
     //..
   }
 
-  @Override
-  public synchronized void insertVector(String[] path, DataType[] dataTypes, long timestamp,
-                                        Object[] values)
-      throws IOException {
-    //== Here, handle inserted Vector type data from IoTDB.
-    //extSession.insertVector(...);
-    //...
-  }
+//  @Override
+//  public void insertVector(String[] path, DataType[] dataTypes, long timestamp,
+//                                        Object[] values)
+//      throws IOException {
+//    //== Here, handle inserted Vector type data from IoTDB.
+//    //extSession.insertVector(...);
+//    //...
+//  }

Review Comment:
   Thank @Cpaulyz 
   In current design, this interface method will not be used.
   So I have just removed this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] jamber001 commented on a diff in pull request #6947: [IOTDB-3465] ext pipe support DeletionPipeData

Posted by GitBox <gi...@apache.org>.
jamber001 commented on code in PR #6947:
URL: https://github.com/apache/iotdb/pull/6947#discussion_r953413483


##########
server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java:
##########
@@ -176,46 +194,69 @@ private void monitorPipeData() {
       return;
     }
 
-    while (true) {
-      List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
-      if ((pipeDataList != null)
-          && (!pipeDataList.isEmpty())
-          && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
-              > lastPipeDataSerialNumber)) {
-        for (PipeData pipeData : pipeDataList) {
-          long pipeDataSerialNumber = pipeData.getSerialNumber();
-          if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
-            continue;
-          }
-          lastPipeDataSerialNumber = pipeData.getSerialNumber();
+    while (alive) {
+      try {
+        // == pull Pipe src data and insert them to pipeOpManager
+        List<PipeData> pipeDataList = tsFilePipe.pull(Long.MAX_VALUE);
+        if ((pipeDataList != null)
+            && (!pipeDataList.isEmpty())
+            && (pipeDataList.get(pipeDataList.size() - 1).getSerialNumber()
+                > lastPipeDataSerialNumber)) {
+          for (PipeData pipeData : pipeDataList) {
+            long pipeDataSerialNumber = pipeData.getSerialNumber();
+            if (pipeDataSerialNumber <= lastPipeDataSerialNumber) {
+              continue;
+            }
 
-          // extract the Tsfile PipeData
-          if (pipeData instanceof TsFilePipeData) {
-            TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+            // == extract the Tsfile PipeData
+            if (pipeData instanceof TsFilePipeData) {
+              TsFilePipeData tsFilePipeData = (TsFilePipeData) pipeData;
+
+              String sgName = tsFilePipeData.getStorageGroupName();

Review Comment:
   @Cpaulyz  Thank your comments.
   Here, I prefer to keep current design (using storageGroupName ). Because I feel curent desgin has higher effeciency.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org