You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/03 02:43:36 UTC

[GitHub] [iotdb] yschengzi opened a new pull request #5159: [To new_sync][IOTDB-1907] implement customized sync process: sender 3

yschengzi opened a new pull request #5159:
URL: https://github.com/apache/iotdb/pull/5159


   Fix BufferedPipeDataQueue, add BufferedPipeDataQueue for sender, add recMsg in SenderService, add IT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] qiaojialin merged pull request #5159: [To new_sync][IOTDB-1907] implement customized sync process: sender 3

Posted by GitBox <gi...@apache.org>.
qiaojialin merged pull request #5159:
URL: https://github.com/apache/iotdb/pull/5159


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] Cpaulyz commented on a change in pull request #5159: [To new_sync][IOTDB-1907] implement customized sync process: sender 3

Posted by GitBox <gi...@apache.org>.
Cpaulyz commented on a change in pull request #5159:
URL: https://github.com/apache/iotdb/pull/5159#discussion_r831812603



##########
File path: server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
##########
@@ -26,15 +26,21 @@
 
   public static final String SENDER_PIPE_DIR_NAME = "sender";
   public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
+  public static final String PIPE_LOG_DIR_NAME = "pipe-log";

Review comment:
       Receiver also needs this constant, move to "common" part.

##########
File path: server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
##########
@@ -65,20 +64,20 @@ public ILoader createLoader() {
     return new DeletionLoader(deletion);
   }
 
-  @Override
-  public void sendToTransport() {
-    try (ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
-        DataOutputStream stream = new DataOutputStream(bytesStream)) {
-      serialize(stream);
-      // senderTransport(bytesStream.toArray, this);
-      // System.out.println(this);
-    } catch (IOException e) {
-      logger.warn(
-          String.format(
-              "Serialize deletion pipeData %s error, can not send to transport, because %s.",
-              this, e));
-    }
-  }
+  //  @Override
+  //  public void sendToTransport() {
+  //    try (ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+  //        DataOutputStream stream = new DataOutputStream(bytesStream)) {
+  //      serialize(stream);
+  //      // senderTransport(bytesStream.toArray, this);
+  //      // System.out.println(this);
+  //    } catch (IOException e) {
+  //      logger.warn(
+  //          String.format(
+  //              "Serialize deletion pipeData %s error, can not send to transport, because %s.",
+  //              this, e));
+  //    }
+  //  }

Review comment:
       delete

##########
File path: server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
##########
@@ -175,35 +181,45 @@ public Pipe parseCreatePipePlan(CreatePipePlan plan, PipeSink pipeSink, long pip
       }
     }
 
-    // get TsFilePipe
-    //    PipeSink.Type pipeSinkType = pipeSink.getType();
-    //    if (!pipeSinkType.equals(PipeSink.Type.IoTDB)) {
-    //      throw new PipeException(
-    //          String.format(
-    //              "Wrong pipeSink type %s for create TsFilePipe.", pipeSinkType)); // internal
-    // error
-    //    }
-    return new TsFilePipe(
-        pipeCreateTime, plan.getPipeName(), pipeSink, plan.getDataStartTimestamp(), syncDelOp);
+    TsFilePipe pipe =
+        new TsFilePipe(
+            pipeCreateTime, plan.getPipeName(), pipeSink, plan.getDataStartTimestamp(), syncDelOp);
+    try {
+      if (!(pipeSink instanceof IoTDBPipeSink)) {
+        throw new PipeException(
+            String.format(
+                "Wrong pipeSink type %s for create pipe %s", pipeSink.getType(), pipe.getName()));
+      }
+      ITransportClient transportClient =
+          new TransportClient(
+              pipe, ((IoTDBPipeSink) pipeSink).getIp(), ((IoTDBPipeSink) pipeSink).getPort());
+      pipe.setTransportHandler(

Review comment:
       It's neccessary to send heartbeat CREATE to receiver to regisiter this pipe.
   
   Why not add a new function `create()` for pipe, which creates  TransportClient and send CREATE heartbeat?

##########
File path: server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportSenderConfig.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.transport.conf;
+
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TransportSenderConfig {
+
+  private String serverIp = "127.0.0.1";
+
+  private int serverPort = 5555;
+
+  private int syncPeriodInSecond = 600;
+
+  private String senderFolderPath;
+
+  private String lastFileInfoPath;
+
+  private String snapshotPath;
+
+  /** The maximum number of retry when syncing a file to receiver fails. */
+  private int maxNumOfSyncFileRetry = 5;
+
+  /** Storage groups which participate in sync process */
+  private List<String> storageGroupList = new ArrayList<>();

Review comment:
       seems that it's need to use this parameter in new sync?

##########
File path: server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
##########
@@ -78,11 +78,11 @@ public ILoader createLoader() {
     return new SchemaLoader(plan);
   }
 
-  @Override
-  public void sendToTransport() {
-    // senderTransport(getBytes(), this);
-    // System.out.println(this);
-  }
+  //  @Override
+  //  public void sendToTransport() {
+  //    // senderTransport(getBytes(), this);
+  //    // System.out.println(this);
+  //  }

Review comment:
       delete




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org