You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/30 02:43:41 UTC

[incubator-iotdb] branch reimpl_sync updated (99e577c -> eeb1e3c)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 99e577c  complete sync receiver end
     new 0d96f69  update
     new 35f06a8  complete load module in receiver end
     new eeb1e3c  add load log and clean up in receiver end

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/sync/receiver/SyncServerManager.java  |    3 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    |  148 ++-
 .../db/sync/receiver/load/FileLoaderManager.java   |   93 ++
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |   11 +-
 .../iotdb/db/sync/receiver/load/ILoadLogger.java   |   18 +
 .../iotdb/db/sync/receiver/load/LoadLogger.java    |   50 +
 .../sync/receiver/recover/SyncReceiverLogger.java  |   18 +
 .../db/sync/receiver/transfer/SyncServiceImpl.java |   50 +-
 .../apache/iotdb/db/sync/sender/conf/Constans.java |    3 +
 .../db/sync/sender/conf/SyncSenderConfig.java      |    7 +
 .../sync/sender/transfer/DataTransferManager.java  |   38 +-
 .../storagegroup/FileNodeManagerBenchmark.java     |    2 +-
 .../db/sync/sender/MultipleClientSyncTest.java     |  226 ----
 .../iotdb/db/sync/sender/SingleClientSyncTest.java | 1192 ++++++++++----------
 .../iotdb/db/sync/sender/SyncFileManagerTest.java  |  748 ++++++------
 .../apache/iotdb/db/sync/test/SyncTestClient1.java |  258 -----
 .../apache/iotdb/db/sync/test/SyncTestClient2.java |  262 -----
 .../apache/iotdb/db/sync/test/SyncTestClient3.java |  282 -----
 .../java/org/apache/iotdb/db/sync/test/Utils.java  |   44 -
 .../iotdb/db/{sync/test => utils}/RandomNum.java   |    2 +-
 service-rpc/src/main/thrift/sync.thrift            |    2 +-
 21 files changed, 1383 insertions(+), 2074 deletions(-)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/test/Utils.java
 rename server/src/test/java/org/apache/iotdb/db/{sync/test => utils}/RandomNum.java (98%)


[incubator-iotdb] 02/03: complete load module in receiver end

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 35f06a88c21e2fa99d03d44c4e1633197fcad5b4
Author: lta <li...@163.com>
AuthorDate: Thu Aug 29 21:00:57 2019 +0800

    complete load module in receiver end
---
 .../iotdb/db/sync/receiver/SyncServerManager.java  |   3 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    | 105 +++++++++++++++++++--
 .../db/sync/receiver/load/FileLoaderManager.java   |  93 ++++++++++++++++++
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |   6 ++
 .../sync/receiver/recover/SyncReceiverLogger.java  |  18 ++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  18 ++--
 .../apache/iotdb/db/sync/sender/conf/Constans.java |   2 +
 .../db/sync/sender/conf/SyncSenderConfig.java      |   7 ++
 .../sync/sender/transfer/DataTransferManager.java  |  38 +++++++-
 service-rpc/src/main/thrift/sync.thrift            |   2 +-
 10 files changed, 275 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 1d37539..faba821 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
 import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
@@ -65,6 +66,7 @@ public class SyncServerManager implements IService {
     if (!conf.isSyncEnable()) {
       return;
     }
+    FileLoaderManager.getInstance().start();
     if (conf.getIpWhiteList() == null) {
       logger.error(
           "Sync server failed to start because IP white list is null, please set IP white list.");
@@ -83,6 +85,7 @@ public class SyncServerManager implements IService {
   @Override
   public void stop() {
     if (conf.isSyncEnable()) {
+      FileLoaderManager.getInstance().stop();
       ((SyncServiceThread) syncServerThread).close();
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index c09b0ec..7681433 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -1,33 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
+import java.util.ArrayDeque;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FileLoader implements IFileLoader {
 
-  private FileLoader(){
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileLoader.class);
 
+  private static final int WAIT_TIME = 1000;
+
+  private File syncEndFile;
+
+  private String senderName;
+
+  private ArrayDeque<LoadTask> queue = new ArrayDeque<>();
+
+  private volatile boolean endSync = false;
+
+  private FileLoader(String senderName, String syncFolderPath) {
+    this.senderName = senderName;
+    this.syncEndFile = new File(syncFolderPath, Constans.SYNC_END);
+    FileLoaderManager.getInstance().addFileLoader(senderName, this);
+    FileLoaderManager.getInstance().addLoadTaskRunner(loadTaskRunner);
   }
 
-  public static FileLoader getInstance(){
-    return FileLoaderHolder.INSTANCE;
+  public static FileLoader createFileLoader(String senderName, String syncFolderPath) {
+    return new FileLoader(senderName, syncFolderPath);
   }
 
+  private Runnable loadTaskRunner = () -> {
+    try {
+      while (true) {
+        if (endSync) {
+          cleanUp();
+          break;
+        }
+        if (queue.isEmpty()) {
+          synchronized (queue) {
+            if (queue.isEmpty()) {
+              queue.wait(WAIT_TIME);
+            }
+          }
+        }
+        if (!queue.isEmpty()) {
+          handleLoadTask(queue.poll());
+        }
+      }
+    } catch (InterruptedException e) {
+      LOGGER.info("Close the run load task thread.");
+    }
+  };
+
   @Override
   public void addDeletedFileName(String sgName, File deletedFile) {
-
+    synchronized (queue) {
+      queue.add(new LoadTask(sgName, deletedFile, LoadType.DELETE));
+      queue.notify();
+    }
   }
 
   @Override
   public void addTsfile(String sgName, File tsfile) {
+    synchronized (queue) {
+      queue.add(new LoadTask(sgName, tsfile, LoadType.ADD));
+      queue.notify();
+    }
+  }
+
+  @Override
+  public void endSync() {
+    this.endSync = true;
+  }
+
+  @Override
+  public void handleLoadTask(LoadTask task) {
 
   }
 
+  @Override
+  public void cleanUp() {
+    FileLoaderManager.getInstance().removeFileLoader(senderName);
+  }
+
+  class LoadTask {
 
-  private static class FileLoaderHolder{
-    private static final FileLoader INSTANCE = new FileLoader();
+    String sgName;
+    File file;
+    LoadType type;
 
-    private static FileLoader getInstance(){
-      return INSTANCE;
+    LoadTask(String sgName, File file, LoadType type) {
+      this.sgName = sgName;
+      this.file = file;
+      this.type = type;
     }
   }
+
+  private enum LoadType {
+    DELETE, ADD;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
new file mode 100644
index 0000000..2b86777
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.load;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileLoaderManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileLoaderManager.class);
+
+  private static final int WAIT_TIMEOUT = 2000;
+
+  private ConcurrentHashMap<String, FileLoader> fileLoaderMap;
+
+  private ExecutorService loadTaskRunnerPool;
+
+
+  private FileLoaderManager() {
+  }
+
+  public static FileLoaderManager getInstance() {
+    return FileLoaderManagerHolder.INSTANCE;
+  }
+
+  public void addFileLoader(String senderName, FileLoader fileLoader){
+    fileLoaderMap.put(senderName, fileLoader);
+  }
+
+  public void removeFileLoader(String senderName){
+    fileLoaderMap.remove(senderName);
+  }
+
+  public FileLoader getFileLoader(String senderName) {
+    return fileLoaderMap.get(senderName);
+  }
+
+  public void addLoadTaskRunner(Runnable taskRunner){
+    loadTaskRunnerPool.submit(taskRunner);
+  }
+
+  public void start() {
+    if (fileLoaderMap == null) {
+      fileLoaderMap = new ConcurrentHashMap<>();
+    }
+    if (loadTaskRunnerPool == null) {
+      loadTaskRunnerPool = Executors.newCachedThreadPool();
+    }
+  }
+
+  public void stop() {
+    fileLoaderMap = null;
+    loadTaskRunnerPool.shutdownNow();
+    int totalWaitTime = WAIT_TIMEOUT;
+    while (!loadTaskRunnerPool.isTerminated()) {
+      try {
+        if (!loadTaskRunnerPool.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+          LOGGER.info("File load manager thread pool doesn't exit after {}ms.",
+              +totalWaitTime);
+        }
+        totalWaitTime += WAIT_TIMEOUT;
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted while waiting file load manager thread pool to exit. ", e);
+      }
+    }
+    loadTaskRunnerPool = null;
+  }
+
+  private static class FileLoaderManagerHolder {
+
+    private static final FileLoaderManager INSTANCE = new FileLoaderManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
index c2d55b0..98aeca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader.LoadTask;
 
 public interface IFileLoader {
 
@@ -26,4 +27,9 @@ public interface IFileLoader {
 
   void addTsfile(String sgName, File tsfile);
 
+  void endSync();
+
+  void handleLoadTask(LoadTask task);
+
+  void cleanUp();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
index 9a8cc96..b43cfa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.receiver.recover;
 
 import java.io.BufferedWriter;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7156ef7..e2f45a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.MetadataOperationType;
 import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
 import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.utils.FilePathUtils;
@@ -71,7 +72,7 @@ public class SyncServiceImpl implements SyncService.Iface {
 
   private ThreadLocal<SyncReceiverLogger> syncLog = new ThreadLocal<>();
 
-  private ThreadLocal<String> senderIp = new ThreadLocal<>();
+  private ThreadLocal<String> senderName = new ThreadLocal<>();
 
   private ThreadLocal<File> currentFile = new ThreadLocal<>();
 
@@ -83,10 +84,10 @@ public class SyncServiceImpl implements SyncService.Iface {
    * Verify IP address of sender
    */
   @Override
-  public ResultStatus check(String ipAddress) {
+  public ResultStatus check(String ipAddress, String uuid) {
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
     if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
-      senderIp.set(ipAddress);
+      senderName.set(ipAddress + "-" + uuid);
       if (checkRecovery()) {
         return getSuccessResult();
       } else {
@@ -105,7 +106,7 @@ public class SyncServiceImpl implements SyncService.Iface {
       }
       return true;
     } catch (IOException e) {
-      e.printStackTrace();
+      logger.error("Check recovery state fail", e);
       return false;
     }
   }
@@ -115,6 +116,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       initPath();
       currentSG.remove();
+      FileLoader.createFileLoader(senderName.get(), syncFolderPath.get());
       syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), Constans.SYNC_LOG_NAME)));
       return getSuccessResult();
     } catch (DiskSpaceInsufficientException | IOException e) {
@@ -130,7 +132,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     String dataDir = DirectoryManager.getInstance().getNextFolderForSequenceFile();
     syncFolderPath
         .set(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER + File.separatorChar
-            + senderIp.get());
+            + senderName.get());
   }
 
   /**
@@ -153,7 +155,8 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus syncDeletedFileName(String fileName) throws TException {
     try {
       syncLog.get().finishSyncDeletedFileName(new File(fileName));
-      FileLoader.getInstance().addDeletedFileName(currentSG.get(), new File(fileName));
+      FileLoaderManager.getInstance().getFileLoader(senderName.get())
+          .addDeletedFileName(currentSG.get(), new File(fileName));
     } catch (IOException e) {
       logger.error("Can not sync deleted file", e);
       return getErrorResult(
@@ -217,7 +220,8 @@ public class SyncServiceImpl implements SyncService.Iface {
         } else {
           if (!currentFile.get().getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
             syncLog.get().finishSyncTsfile(currentFile.get());
-            FileLoader.getInstance().addTsfile(currentSG.get(), currentFile.get());
+            FileLoaderManager.getInstance().getFileLoader(senderName.get())
+                .addTsfile(currentSG.get(), currentFile.get());
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 576b416..0825774 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -40,6 +40,8 @@ public class Constans {
 
   public static final String LOCK_FILE_NAME = "sync_lock";
 
+  public static final String UUID_FILE_NAME = "uuid.txt";
+
   public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
 
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index d6028ca..4a0ae8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -32,6 +32,8 @@ public class SyncSenderConfig {
 
   private String lockFilePath;
 
+  private String uuidPath;
+
   private String lastFileInfo;
 
   private String snapshotPath;
@@ -43,6 +45,7 @@ public class SyncSenderConfig {
     senderFolderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar +
         getSyncReceiverName();
     lockFilePath = senderFolderPath + File.separatorChar + Constans.LOCK_FILE_NAME;
+    uuidPath = senderFolderPath + File.separatorChar + Constans.UUID_FILE_NAME;
     lastFileInfo = senderFolderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
     snapshotPath = senderFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
     if(!new File(snapshotPath).exists()){
@@ -106,6 +109,10 @@ public class SyncSenderConfig {
     this.snapshotPath = snapshotPath;
   }
 
+  public String getUuidPath() {
+    return uuidPath;
+  }
+
   public String getSyncReceiverName() {
     return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index ce67c27..ec75c0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -19,6 +19,7 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -35,6 +36,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
@@ -275,7 +277,8 @@ public class DataTransferManager implements IDataTransferManager {
   @Override
   public void confirmIdentity() throws SyncConnectionException {
     try {
-      ResultStatus status = serviceClient.check(InetAddress.getLocalHost().getHostAddress());
+      ResultStatus status = serviceClient.check(InetAddress.getLocalHost().getHostAddress(),
+          getOrCreateUUID(config.getUuidPath()));
       if (!status.success) {
         throw new SyncConnectionException(
             "The receiver rejected the synchronization task because " + status.errorMsg);
@@ -286,6 +289,39 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
+  /**
+   * UUID marks the identity of sender for receiver.
+   */
+  public String getOrCreateUUID(String uuidPath) throws IOException {
+    File file = new File(uuidPath);
+    String uuid;
+    if (!file.getParentFile().exists()) {
+      file.getParentFile().mkdirs();
+    }
+    if (!file.exists()) {
+      try (FileOutputStream out = new FileOutputStream(file)) {
+        file.createNewFile();
+        uuid = generateUUID();
+        out.write(uuid.getBytes());
+      } catch (IOException e) {
+        logger.error("Cannot insert UUID to file {}", file.getPath());
+        throw new IOException(e);
+      }
+    } else {
+      try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
+        uuid = bf.readLine();
+      } catch (IOException e) {
+        logger.error("Cannot read UUID from file{}", file.getPath());
+        throw new IOException(e);
+      }
+    }
+    return uuid;
+  }
+
+  private String generateUUID() {
+    return UUID.randomUUID().toString().replaceAll("-", "");
+  }
+
   @Override
   public void syncSchema() throws SyncConnectionException, TException {
     int retryCount = 0;
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index 2064c4e..17bc241 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -29,7 +29,7 @@ struct ResultStatus{
 }
 
 service SyncService{
-	ResultStatus check(1:string address)
+	ResultStatus check(1:string address, 2:string uuid)
 	ResultStatus startSync();
 	ResultStatus init(1:string storageGroupName)
 	ResultStatus syncDeletedFileName(1:string fileName)


[incubator-iotdb] 03/03: add load log and clean up in receiver end

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit eeb1e3c6e0156e3d7851331445ea8c5defadfb6b
Author: lta <li...@163.com>
AuthorDate: Fri Aug 30 10:43:23 2019 +0800

    add load log and clean up in receiver end
---
 .../iotdb/db/sync/receiver/load/FileLoader.java    |   77 +-
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |    7 +-
 .../iotdb/db/sync/receiver/load/ILoadLogger.java   |   18 +
 .../iotdb/db/sync/receiver/load/LoadLogger.java    |   50 +
 .../db/sync/receiver/transfer/SyncServiceImpl.java |    9 +-
 .../apache/iotdb/db/sync/sender/conf/Constans.java |    1 +
 .../storagegroup/FileNodeManagerBenchmark.java     |    2 +-
 .../iotdb/db/sync/sender/SingleClientSyncTest.java | 1192 ++++++++++----------
 .../iotdb/db/sync/sender/SyncFileManagerTest.java  |  748 ++++++------
 .../java/org/apache/iotdb/db/utils/RandomNum.java  |   70 ++
 10 files changed, 1178 insertions(+), 996 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 7681433..8db09c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayDeque;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
@@ -30,22 +31,28 @@ public class FileLoader implements IFileLoader {
 
   private static final int WAIT_TIME = 1000;
 
-  private File syncEndFile;
+  private String syncFolderPath;
 
   private String senderName;
 
   private ArrayDeque<LoadTask> queue = new ArrayDeque<>();
 
+  private LoadLogger loadLog;
+
+  private LoadType curType = LoadType.NONE;
+
   private volatile boolean endSync = false;
 
-  private FileLoader(String senderName, String syncFolderPath) {
+  private FileLoader(String senderName, String syncFolderPath) throws IOException {
     this.senderName = senderName;
-    this.syncEndFile = new File(syncFolderPath, Constans.SYNC_END);
+    this.syncFolderPath = syncFolderPath;
+    this.loadLog = new LoadLogger(new File(syncFolderPath, Constans.LOAD_LOG_NAME));
     FileLoaderManager.getInstance().addFileLoader(senderName, this);
     FileLoaderManager.getInstance().addLoadTaskRunner(loadTaskRunner);
   }
 
-  public static FileLoader createFileLoader(String senderName, String syncFolderPath) {
+  public static FileLoader createFileLoader(String senderName, String syncFolderPath)
+      throws IOException {
     return new FileLoader(senderName, syncFolderPath);
   }
 
@@ -67,23 +74,23 @@ public class FileLoader implements IFileLoader {
           handleLoadTask(queue.poll());
         }
       }
-    } catch (InterruptedException e) {
-      LOGGER.info("Close the run load task thread.");
+    } catch (InterruptedException | IOException e) {
+      LOGGER.error("Can not handle load task", e);
     }
   };
 
   @Override
-  public void addDeletedFileName(String sgName, File deletedFile) {
+  public void addDeletedFileName(File deletedFile) {
     synchronized (queue) {
-      queue.add(new LoadTask(sgName, deletedFile, LoadType.DELETE));
+      queue.add(new LoadTask(deletedFile, LoadType.DELETE));
       queue.notify();
     }
   }
 
   @Override
-  public void addTsfile(String sgName, File tsfile) {
+  public void addTsfile(File tsfile) {
     synchronized (queue) {
-      queue.add(new LoadTask(sgName, tsfile, LoadType.ADD));
+      queue.add(new LoadTask(tsfile, LoadType.ADD));
       queue.notify();
     }
   }
@@ -94,29 +101,63 @@ public class FileLoader implements IFileLoader {
   }
 
   @Override
-  public void handleLoadTask(LoadTask task) {
+  public void handleLoadTask(LoadTask task) throws IOException {
+    switch (task.type) {
+      case ADD:
+        loadDeletedFile(task.file);
+        break;
+      case DELETE:
+        loadNewTsfile(task.file);
+        break;
+      default:
+        LOGGER.error("Wrong load task type {}", task.type);
+    }
+  }
 
+  private void loadDeletedFile(File file) throws IOException {
+    if (curType != LoadType.DELETE) {
+      loadLog.startLoadDeletedFiles();
+      curType = LoadType.DELETE;
+    }
+    // TODO load deleted file
+    loadLog.finishLoadDeletedFile(file);
   }
 
+  private void loadNewTsfile(File file) throws IOException {
+    if (curType != LoadType.ADD) {
+      loadLog.startLoadTsFiles();
+      curType = LoadType.ADD;
+    }
+    // TODO load new tsfile
+    loadLog.finishLoadTsfile(file);
+  }
+
+
   @Override
   public void cleanUp() {
-    FileLoaderManager.getInstance().removeFileLoader(senderName);
+    try {
+      loadLog.close();
+      new File(syncFolderPath, Constans.SYNC_LOG_NAME).deleteOnExit();
+      new File(syncFolderPath, Constans.LOAD_LOG_NAME).deleteOnExit();
+      new File(syncFolderPath, Constans.SYNC_END).deleteOnExit();
+      FileLoaderManager.getInstance().removeFileLoader(senderName);
+    } catch (IOException e) {
+      LOGGER.error("Can not clean up sync resource.", e);
+    }
   }
 
   class LoadTask {
 
-    String sgName;
-    File file;
-    LoadType type;
+    private File file;
+    private LoadType type;
 
-    LoadTask(String sgName, File file, LoadType type) {
-      this.sgName = sgName;
+    LoadTask(File file, LoadType type) {
       this.file = file;
       this.type = type;
     }
   }
 
   private enum LoadType {
-    DELETE, ADD;
+    DELETE, ADD, NONE
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
index 98aeca9..69f6fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
@@ -19,17 +19,18 @@
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
+import java.io.IOException;
 import org.apache.iotdb.db.sync.receiver.load.FileLoader.LoadTask;
 
 public interface IFileLoader {
 
-  void addDeletedFileName(String sgName, File deletedFile);
+  void addDeletedFileName(File deletedFile);
 
-  void addTsfile(String sgName, File tsfile);
+  void addTsfile(File tsfile);
 
   void endSync();
 
-  void handleLoadTask(LoadTask task);
+  void handleLoadTask(LoadTask task) throws IOException;
 
   void cleanUp();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
new file mode 100644
index 0000000..2cad379
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.db.sync.receiver.load;
+
+import java.io.File;
+import java.io.IOException;
+
+public interface ILoadLogger {
+
+  void startLoadDeletedFiles() throws IOException;
+
+  void finishLoadDeletedFile(File file) throws IOException;
+
+  void startLoadTsFiles() throws IOException;
+
+  void finishLoadTsfile(File file) throws IOException;
+
+  void close() throws IOException;
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
new file mode 100644
index 0000000..31b4c48
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
@@ -0,0 +1,50 @@
+package org.apache.iotdb.db.sync.receiver.load;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class LoadLogger implements ILoadLogger {
+
+  public static final String LOAD_DELETED_FILE_NAME_START = "load deleted files start";
+  public static final String LOAD_TSFILE_START = "load tsfile start";
+  private BufferedWriter bw;
+
+  public LoadLogger(File logFile) throws IOException {
+    bw = new BufferedWriter(new FileWriter(logFile));
+  }
+
+  @Override
+  public void startLoadDeletedFiles() throws IOException {
+    bw.write(LOAD_DELETED_FILE_NAME_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void finishLoadDeletedFile(File file) throws IOException {
+    bw.write(file.getAbsolutePath());
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void startLoadTsFiles() throws IOException {
+    bw.write(LOAD_TSFILE_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void finishLoadTsfile(File file) throws IOException {
+    bw.write(file.getAbsolutePath());
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    bw.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index e2f45a5..525244d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -87,7 +87,7 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus check(String ipAddress, String uuid) {
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
     if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
-      senderName.set(ipAddress + "-" + uuid);
+      senderName.set(ipAddress + Constans.SYNC_DIR_NAME_SEPARATOR + uuid);
       if (checkRecovery()) {
         return getSuccessResult();
       } else {
@@ -156,7 +156,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       syncLog.get().finishSyncDeletedFileName(new File(fileName));
       FileLoaderManager.getInstance().getFileLoader(senderName.get())
-          .addDeletedFileName(currentSG.get(), new File(fileName));
+          .addDeletedFileName(new File(fileName));
     } catch (IOException e) {
       logger.error("Can not sync deleted file", e);
       return getErrorResult(
@@ -169,7 +169,7 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus initSyncData(String filename) throws TException {
     try {
       File file;
-      if (currentSG.get() == null) {
+      if (currentSG.get() == null) { // schema mlog.txt file
         file = new File(getSyncDataPath(), filename);
       } else {
         file = new File(getSyncDataPath(), currentSG.get() + File.separatorChar + filename);
@@ -221,7 +221,7 @@ public class SyncServiceImpl implements SyncService.Iface {
           if (!currentFile.get().getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
             syncLog.get().finishSyncTsfile(currentFile.get());
             FileLoaderManager.getInstance().getFileLoader(senderName.get())
-                .addTsfile(currentSG.get(), currentFile.get());
+                .addTsfile(currentFile.get());
           }
         }
       }
@@ -306,6 +306,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       syncLog.get().close();
       new File(getSyncDataPath(), Constans.SYNC_END).createNewFile();
+      FileLoaderManager.getInstance().getFileLoader(senderName.get()).endSync();
     } catch (IOException e) {
       logger.error("Can not end sync", e);
       return getErrorResult(String.format("Can not end sync because %s", e.getMessage()));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 0825774..d3f8ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -73,5 +73,6 @@ public class Constans {
 
   public static final String SYNC_END = "sync.end";
 
+  public static final String LOAD_LOG_NAME = "load.log";
 
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
index 7d16042..d98022e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.sync.test.RandomNum;
+import org.apache.iotdb.db.utils.RandomNum;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
index 4a7ac7e..4a258a5 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
@@ -1,596 +1,596 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender;
-
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.integration.Constant;
-import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.sender.conf.Constans;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
-import org.apache.iotdb.db.sync.sender.transfer.DataTransferManager;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The test is to run a complete sync function Before you run the test, make sure receiver has been
- * cleaned up and inited.
- */
-public class SingleClientSyncTest {
-
-  DataTransferManager fileSenderImpl = DataTransferManager.getInstance();
-  private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-  private String serverIpTest = "192.168.130.7";
-  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
-  private Set<String> dataSender = new HashSet<>();
-  private Set<String> dataReceiver = new HashSet<>();
-  private boolean success = true;
-  private IoTDB deamon;
-  private static final String[] sqls1 = new String[]{"SET STORAGE GROUP TO root.vehicle",
-      "SET STORAGE GROUP TO root.test",
-      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
-      "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
-      "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
-      "CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
-      "CREATE TIMESERIES root.test.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
-      "CREATE TIMESERIES root.test.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
-      "CREATE TIMESERIES root.test.d1.g0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
-      "insert into root.vehicle.d0(timestamp,s0) values(10,100)",
-      "insert into root.vehicle.d0(timestamp,s0,s1) values(12,101,'102')",
-      "insert into root.vehicle.d0(timestamp,s1) values(19,'103')",
-      "insert into root.vehicle.d1(timestamp,s2) values(11,104.0)",
-      "insert into root.vehicle.d1(timestamp,s2,s3) values(15,105.0,true)",
-      "insert into root.vehicle.d1(timestamp,s3) values(17,false)",
-      "insert into root.vehicle.d0(timestamp,s0) values(20,1000)",
-      "insert into root.vehicle.d0(timestamp,s0,s1) values(22,1001,'1002')",
-      "insert into root.vehicle.d0(timestamp,s1) values(29,'1003')",
-      "insert into root.vehicle.d1(timestamp,s2) values(21,1004.0)",
-      "insert into root.vehicle.d1(timestamp,s2,s3) values(25,1005.0,true)",
-      "insert into root.vehicle.d1(timestamp,s3) values(27,true)",
-      "insert into root.test.d0(timestamp,s0) values(10,106)",
-      "insert into root.test.d0(timestamp,s0,s1) values(14,107,'108')",
-      "insert into root.test.d0(timestamp,s1) values(16,'109')",
-      "insert into root.test.d1.g0(timestamp,s0) values(1,110)",
-      "insert into root.test.d0(timestamp,s0) values(30,1006)",
-      "insert into root.test.d0(timestamp,s0,s1) values(34,1007,'1008')",
-      "insert into root.test.d0(timestamp,s1) values(36,'1090')",
-      "insert into root.test.d1.g0(timestamp,s0) values(10,1100)", "merge", "flush",};
-  private static final String[] sqls2 = new String[]{
-      "insert into root.vehicle.d0(timestamp,s0) values(6,120)",
-      "insert into root.vehicle.d0(timestamp,s0,s1) values(38,121,'122')",
-      "insert into root.vehicle.d0(timestamp,s1) values(9,'123')",
-      "insert into root.vehicle.d0(timestamp,s0) values(16,128)",
-      "insert into root.vehicle.d0(timestamp,s0,s1) values(18,189,'198')",
-      "insert into root.vehicle.d0(timestamp,s1) values(99,'1234')",
-      "insert into root.vehicle.d1(timestamp,s2) values(14,1024.0)",
-      "insert into root.vehicle.d1(timestamp,s2,s3) values(29,1205.0,true)",
-      "insert into root.vehicle.d1(timestamp,s3) values(33,true)",
-      "insert into root.test.d0(timestamp,s0) values(15,126)",
-      "insert into root.test.d0(timestamp,s0,s1) values(8,127,'128')",
-      "insert into root.test.d0(timestamp,s1) values(20,'129')",
-      "insert into root.test.d1.g0(timestamp,s0) values(14,430)",
-      "insert into root.test.d0(timestamp,s0) values(150,426)",
-      "insert into root.test.d0(timestamp,s0,s1) values(80,427,'528')",
-      "insert into root.test.d0(timestamp,s1) values(2,'1209')",
-      "insert into root.test.d1.g0(timestamp,s0) values(4,330)", "merge", "flush",};
-  private static final String[] sqls3 = new String[]{"SET STORAGE GROUP TO root.iotdb",
-      "SET STORAGE GROUP TO root.flush",
-      "CREATE TIMESERIES root.iotdb.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
-      "CREATE TIMESERIES root.iotdb.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
-      "CREATE TIMESERIES root.iotdb.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
-      "CREATE TIMESERIES root.iotdb.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
-      "CREATE TIMESERIES root.flush.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
-      "CREATE TIMESERIES root.flush.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
-      "CREATE TIMESERIES root.flush.d1.g0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
-      "insert into root.iotdb.d0(timestamp,s0) values(3,100)",
-      "insert into root.iotdb.d0(timestamp,s0,s1) values(22,101,'102')",
-      "insert into root.iotdb.d0(timestamp,s1) values(24,'103')",
-      "insert into root.iotdb.d1(timestamp,s2) values(21,104.0)",
-      "insert into root.iotdb.d1(timestamp,s2,s3) values(25,105.0,true)",
-      "insert into root.iotdb.d1(timestamp,s3) values(27,false)",
-      "insert into root.iotdb.d0(timestamp,s0) values(30,1000)",
-      "insert into root.iotdb.d0(timestamp,s0,s1) values(202,101,'102')",
-      "insert into root.iotdb.d0(timestamp,s1) values(44,'103')",
-      "insert into root.iotdb.d1(timestamp,s2) values(1,404.0)",
-      "insert into root.iotdb.d1(timestamp,s2,s3) values(250,10.0,true)",
-      "insert into root.iotdb.d1(timestamp,s3) values(207,false)",
-      "insert into root.flush.d0(timestamp,s0) values(20,106)",
-      "insert into root.flush.d0(timestamp,s0,s1) values(14,107,'108')",
-      "insert into root.flush.d1.g0(timestamp,s0) values(1,110)",
-      "insert into root.flush.d0(timestamp,s0) values(200,1006)",
-      "insert into root.flush.d0(timestamp,s0,s1) values(1004,1007,'1080')",
-      "insert into root.flush.d1.g0(timestamp,s0) values(1000,910)",
-      "insert into root.vehicle.d0(timestamp,s0) values(209,130)",
-      "insert into root.vehicle.d0(timestamp,s0,s1) values(206,131,'132')",
-      "insert into root.vehicle.d0(timestamp,s1) values(70,'33')",
-      "insert into root.vehicle.d1(timestamp,s2) values(204,14.0)",
-      "insert into root.vehicle.d1(timestamp,s2,s3) values(29,135.0,false)",
-      "insert into root.vehicle.d1(timestamp,s3) values(14,false)",
-      "insert into root.test.d0(timestamp,s0) values(19,136)",
-      "insert into root.test.d0(timestamp,s0,s1) values(7,137,'138')",
-      "insert into root.test.d0(timestamp,s1) values(30,'139')",
-      "insert into root.test.d1.g0(timestamp,s0) values(4,150)",
-      "insert into root.test.d0(timestamp,s0) values(1900,1316)",
-      "insert into root.test.d0(timestamp,s0,s1) values(700,1307,'1038')",
-      "insert into root.test.d0(timestamp,s1) values(3000,'1309')",
-      "insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",};
-  private boolean testFlag = Constant.testFlag;
-  private static final String SYNC_CLIENT = Constans.SYNC_SENDER;
-  private static final Logger logger = LoggerFactory.getLogger(SingleClientSyncTest.class);
-
-  public static void main(String[] args) throws Exception {
-    SingleClientSyncTest singleClientPostBackTest = new SingleClientSyncTest();
-    singleClientPostBackTest.setUp();
-    singleClientPostBackTest.testPostback();
-    singleClientPostBackTest.tearDown();
-    System.exit(0);
-  }
-
-  public void setConfig() {
-    config.setSenderFolderPath(
-        config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.UUID_FILE_NAME);
-    config.setLastFileInfo(
-        config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.LAST_LOCAL_FILE_NAME);
-    String[] sequenceFileDirectory = config.getSeqFileDirectory();
-    String[] snapshots = new String[config.getSeqFileDirectory().length];
-    for (int i = 0; i < config.getSeqFileDirectory().length; i++) {
-      sequenceFileDirectory[i] = new File(sequenceFileDirectory[i]).getAbsolutePath();
-      if (!sequenceFileDirectory[i].endsWith(File.separator)) {
-        sequenceFileDirectory[i] = sequenceFileDirectory[i] + File.separator;
-      }
-      snapshots[i] =
-          sequenceFileDirectory[i] + SYNC_CLIENT + File.separator + Constans.DATA_SNAPSHOT_NAME
-              + File.separator;
-    }
-    config.setSnapshotPaths(snapshots);
-    config.setSeqFileDirectory(sequenceFileDirectory);
-    config.setServerIp(serverIpTest);
-    fileSenderImpl.setConfig(config);
-  }
-
-  public void setUp() throws StartupException, IOException {
-    if (testFlag) {
-      EnvironmentUtils.closeStatMonitor();
-      deamon = IoTDB.getInstance();
-      deamon.active();
-      EnvironmentUtils.envSetUp();
-    }
-    setConfig();
-  }
-
-  public void tearDown() throws Exception {
-    if (testFlag) {
-      deamon.stop();
-      EnvironmentUtils.cleanEnv();
-    }
-    if (success) {
-      logger.debug("Test succeed!");
-    } else {
-      logger.debug("Test failed!");
-    }
-  }
-
-  public void testPostback() throws IOException, SyncConnectionException, ClassNotFoundException, SQLException, InterruptedException {
-    if (testFlag) {
-      // the first time to sync
-      logger.debug("It's the first time to sync!");
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        try (Connection connection = DriverManager
-            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
-                "root")) {
-          Statement statement = connection.createStatement();
-          for (String sql : sqls1) {
-            statement.execute(sql);
-          }
-          statement.close();
-        }
-      } catch (SQLException | ClassNotFoundException e) {
-        fail(e.getMessage());
-      }
-
-      fileSenderImpl.sync();
-
-      // Compare data of sender and receiver
-      dataSender.clear();
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        try (Connection connection = DriverManager
-            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
-                "root")) {
-          Statement statement = connection.createStatement();
-          boolean hasResultSet = statement.execute("select * from root.vehicle");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.test");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-            }
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        }
-      } catch (ClassNotFoundException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-
-      dataReceiver.clear();
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        Connection connection = null;
-        try {
-          connection = DriverManager
-              .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root",
-                  "root");
-          Statement statement = connection.createStatement();
-          boolean hasResultSet = statement.execute("select * from root.vehicle");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-            }
-          }
-
-          hasResultSet = statement.execute("select * from root.test");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-            }
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        } finally {
-          if (connection != null) {
-            connection.close();
-          }
-        }
-      } catch (ClassNotFoundException | SQLException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-      logger.debug(String.valueOf(dataSender.size()));
-      logger.debug(String.valueOf(dataReceiver.size()));
-      logger.debug(dataSender.toString());
-      logger.debug(dataReceiver.toString());
-      if (!(dataSender.size() == dataReceiver.size() && dataSender.containsAll(dataReceiver))) {
-        success = false;
-        return;
-      }
-
-      // the second time to sync
-      logger.debug("It's the second time to sync!");
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        Connection connection = null;
-        try {
-          connection = DriverManager
-              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
-                  "root");
-          Statement statement = connection.createStatement();
-          for (String sql : sqls2) {
-            statement.execute(sql);
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        } finally {
-          if (connection != null) {
-            connection.close();
-          }
-        }
-      } catch (ClassNotFoundException | SQLException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-
-      fileSenderImpl.sync();
-
-      // Compare data of sender and receiver
-      dataSender.clear();
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        Connection connection = null;
-        try {
-          connection = DriverManager
-              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
-                  "root");
-          Statement statement = connection.createStatement();
-          boolean hasResultSet = statement.execute("select * from root.vehicle");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.test");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-            }
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        } finally {
-          if (connection != null) {
-            connection.close();
-          }
-        }
-      } catch (ClassNotFoundException | SQLException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-
-      dataReceiver.clear();
-      {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        Connection connection = null;
-        try {
-          connection = DriverManager
-              .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root",
-                  "root");
-          Statement statement = connection.createStatement();
-          boolean hasResultSet = statement.execute("select * from root.vehicle");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.test");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-            }
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        } finally {
-          if (connection != null) {
-            connection.close();
-          }
-        }
-      }
-      logger.debug(String.valueOf(dataSender.size()));
-      logger.debug(String.valueOf(dataReceiver.size()));
-      logger.debug(dataSender.toString());
-      logger.debug(dataReceiver.toString());
-      if (!(dataSender.size() == dataReceiver.size() && dataSender.containsAll(dataReceiver))) {
-        success = false;
-        return;
-      }
-
-      // the third time to sync
-      logger.debug("It's the third time to sync!");
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        try (Connection connection = DriverManager
-            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
-                "root")) {
-          Statement statement = connection.createStatement();
-          for (String sql : sqls3) {
-            statement.execute(sql);
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        }
-      } catch (ClassNotFoundException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-
-      fileSenderImpl.sync();
-
-      // Compare data of sender and receiver
-      dataSender.clear();
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        try (Connection connection = DriverManager
-            .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root",
-                "root")) {
-          Statement statement = connection.createStatement();
-          boolean hasResultSet = statement.execute("select * from root.vehicle");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.test");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.flush");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.flush.d0.s0")
-                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.flush.d0.s0")
-                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.iotdb");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataSender.add(res.getString("Time") + res.getString("root.iotdb.d0.s0")
-                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
-                  + res.getString("root.iotdb.d1.s3"));
-              logger.debug(res.getString("Time") + res.getString("root.iotdb.d0.s0")
-                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
-                  + res.getString("root.iotdb.d1.s3"));
-            }
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        }
-      } catch (ClassNotFoundException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-
-      dataReceiver.clear();
-      try {
-        Class.forName(Config.JDBC_DRIVER_NAME);
-        Connection connection = null;
-        try {
-          connection = DriverManager
-              .getConnection("jdbc:iotdb://192.168.130.8:6667/", "root", "root");
-          Statement statement = connection.createStatement();
-          boolean hasResultSet = statement.execute("select * from root.vehicle");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
-                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
-                  + res.getString("root.vehicle.d1.s3"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.test");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
-                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.flush");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.flush.d0.s0")
-                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
-              logger.debug(res.getString("Time") + " | " + res.getString("root.flush.d0.s0")
-                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
-            }
-          }
-          hasResultSet = statement.execute("select * from root.iotdb");
-          if (hasResultSet) {
-            ResultSet res = statement.getResultSet();
-            while (res.next()) {
-              dataReceiver.add(res.getString("Time") + res.getString("root.iotdb.d0.s0")
-                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
-                  + res.getString("root.iotdb.d1.s3"));
-              logger.debug(res.getString("Time") + res.getString("root.iotdb.d0.s0")
-                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
-                  + res.getString("root.iotdb.d1.s3"));
-            }
-          }
-          statement.close();
-        } catch (Exception e) {
-          logger.error("", e);
-        } finally {
-          if (connection != null) {
-            connection.close();
-          }
-        }
-      } catch (ClassNotFoundException | SQLException e) {
-        fail(e.getMessage());
-        Thread.currentThread().interrupt();
-      }
-      logger.debug(String.valueOf(dataSender.size()));
-      logger.debug(String.valueOf(dataReceiver.size()));
-      logger.debug(String.valueOf(dataSender));
-      logger.debug(String.valueOf(dataReceiver));
-      if (!(dataSender.size() == dataReceiver.size() && dataSender.containsAll(dataReceiver))) {
-        success = false;
-      }
-    }
-  }
-}
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.sync.sender;
+//
+//import static org.junit.Assert.fail;
+//
+//import java.io.File;
+//import java.io.IOException;
+//import java.sql.Connection;
+//import java.sql.DriverManager;
+//import java.sql.ResultSet;
+//import java.sql.SQLException;
+//import java.sql.Statement;
+//import java.util.HashSet;
+//import java.util.Set;
+//import org.apache.iotdb.db.conf.IoTDBConfig;
+//import org.apache.iotdb.db.conf.IoTDBDescriptor;
+//import org.apache.iotdb.db.exception.StartupException;
+//import org.apache.iotdb.db.exception.SyncConnectionException;
+//import org.apache.iotdb.db.integration.Constant;
+//import org.apache.iotdb.db.service.IoTDB;
+//import org.apache.iotdb.db.sync.sender.conf.Constans;
+//import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+//import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+//import org.apache.iotdb.db.sync.sender.transfer.DataTransferManager;
+//import org.apache.iotdb.db.utils.EnvironmentUtils;
+//import org.apache.iotdb.jdbc.Config;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+///**
+// * The test is to run a complete sync function Before you run the test, make sure receiver has been
+// * cleaned up and inited.
+// */
+//public class SingleClientSyncTest {
+//
+//  DataTransferManager fileSenderImpl = DataTransferManager.getInstance();
+//  private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+//  private String serverIpTest = "192.168.130.7";
+//  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+//  private Set<String> dataSender = new HashSet<>();
+//  private Set<String> dataReceiver = new HashSet<>();
+//  private boolean success = true;
+//  private IoTDB deamon;
+//  private static final String[] sqls1 = new String[]{"SET STORAGE GROUP TO root.vehicle",
+//      "SET STORAGE GROUP TO root.test",
+//      "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+//      "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+//      "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+//      "CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+//      "CREATE TIMESERIES root.test.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+//      "CREATE TIMESERIES root.test.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+//      "CREATE TIMESERIES root.test.d1.g0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+//      "insert into root.vehicle.d0(timestamp,s0) values(10,100)",
+//      "insert into root.vehicle.d0(timestamp,s0,s1) values(12,101,'102')",
+//      "insert into root.vehicle.d0(timestamp,s1) values(19,'103')",
+//      "insert into root.vehicle.d1(timestamp,s2) values(11,104.0)",
+//      "insert into root.vehicle.d1(timestamp,s2,s3) values(15,105.0,true)",
+//      "insert into root.vehicle.d1(timestamp,s3) values(17,false)",
+//      "insert into root.vehicle.d0(timestamp,s0) values(20,1000)",
+//      "insert into root.vehicle.d0(timestamp,s0,s1) values(22,1001,'1002')",
+//      "insert into root.vehicle.d0(timestamp,s1) values(29,'1003')",
+//      "insert into root.vehicle.d1(timestamp,s2) values(21,1004.0)",
+//      "insert into root.vehicle.d1(timestamp,s2,s3) values(25,1005.0,true)",
+//      "insert into root.vehicle.d1(timestamp,s3) values(27,true)",
+//      "insert into root.test.d0(timestamp,s0) values(10,106)",
+//      "insert into root.test.d0(timestamp,s0,s1) values(14,107,'108')",
+//      "insert into root.test.d0(timestamp,s1) values(16,'109')",
+//      "insert into root.test.d1.g0(timestamp,s0) values(1,110)",
+//      "insert into root.test.d0(timestamp,s0) values(30,1006)",
+//      "insert into root.test.d0(timestamp,s0,s1) values(34,1007,'1008')",
+//      "insert into root.test.d0(timestamp,s1) values(36,'1090')",
+//      "insert into root.test.d1.g0(timestamp,s0) values(10,1100)", "merge", "flush",};
+//  private static final String[] sqls2 = new String[]{
+//      "insert into root.vehicle.d0(timestamp,s0) values(6,120)",
+//      "insert into root.vehicle.d0(timestamp,s0,s1) values(38,121,'122')",
+//      "insert into root.vehicle.d0(timestamp,s1) values(9,'123')",
+//      "insert into root.vehicle.d0(timestamp,s0) values(16,128)",
+//      "insert into root.vehicle.d0(timestamp,s0,s1) values(18,189,'198')",
+//      "insert into root.vehicle.d0(timestamp,s1) values(99,'1234')",
+//      "insert into root.vehicle.d1(timestamp,s2) values(14,1024.0)",
+//      "insert into root.vehicle.d1(timestamp,s2,s3) values(29,1205.0,true)",
+//      "insert into root.vehicle.d1(timestamp,s3) values(33,true)",
+//      "insert into root.test.d0(timestamp,s0) values(15,126)",
+//      "insert into root.test.d0(timestamp,s0,s1) values(8,127,'128')",
+//      "insert into root.test.d0(timestamp,s1) values(20,'129')",
+//      "insert into root.test.d1.g0(timestamp,s0) values(14,430)",
+//      "insert into root.test.d0(timestamp,s0) values(150,426)",
+//      "insert into root.test.d0(timestamp,s0,s1) values(80,427,'528')",
+//      "insert into root.test.d0(timestamp,s1) values(2,'1209')",
+//      "insert into root.test.d1.g0(timestamp,s0) values(4,330)", "merge", "flush",};
+//  private static final String[] sqls3 = new String[]{"SET STORAGE GROUP TO root.iotdb",
+//      "SET STORAGE GROUP TO root.flush",
+//      "CREATE TIMESERIES root.iotdb.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+//      "CREATE TIMESERIES root.iotdb.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+//      "CREATE TIMESERIES root.iotdb.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+//      "CREATE TIMESERIES root.iotdb.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+//      "CREATE TIMESERIES root.flush.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+//      "CREATE TIMESERIES root.flush.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+//      "CREATE TIMESERIES root.flush.d1.g0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+//      "insert into root.iotdb.d0(timestamp,s0) values(3,100)",
+//      "insert into root.iotdb.d0(timestamp,s0,s1) values(22,101,'102')",
+//      "insert into root.iotdb.d0(timestamp,s1) values(24,'103')",
+//      "insert into root.iotdb.d1(timestamp,s2) values(21,104.0)",
+//      "insert into root.iotdb.d1(timestamp,s2,s3) values(25,105.0,true)",
+//      "insert into root.iotdb.d1(timestamp,s3) values(27,false)",
+//      "insert into root.iotdb.d0(timestamp,s0) values(30,1000)",
+//      "insert into root.iotdb.d0(timestamp,s0,s1) values(202,101,'102')",
+//      "insert into root.iotdb.d0(timestamp,s1) values(44,'103')",
+//      "insert into root.iotdb.d1(timestamp,s2) values(1,404.0)",
+//      "insert into root.iotdb.d1(timestamp,s2,s3) values(250,10.0,true)",
+//      "insert into root.iotdb.d1(timestamp,s3) values(207,false)",
+//      "insert into root.flush.d0(timestamp,s0) values(20,106)",
+//      "insert into root.flush.d0(timestamp,s0,s1) values(14,107,'108')",
+//      "insert into root.flush.d1.g0(timestamp,s0) values(1,110)",
+//      "insert into root.flush.d0(timestamp,s0) values(200,1006)",
+//      "insert into root.flush.d0(timestamp,s0,s1) values(1004,1007,'1080')",
+//      "insert into root.flush.d1.g0(timestamp,s0) values(1000,910)",
+//      "insert into root.vehicle.d0(timestamp,s0) values(209,130)",
+//      "insert into root.vehicle.d0(timestamp,s0,s1) values(206,131,'132')",
+//      "insert into root.vehicle.d0(timestamp,s1) values(70,'33')",
+//      "insert into root.vehicle.d1(timestamp,s2) values(204,14.0)",
+//      "insert into root.vehicle.d1(timestamp,s2,s3) values(29,135.0,false)",
+//      "insert into root.vehicle.d1(timestamp,s3) values(14,false)",
+//      "insert into root.test.d0(timestamp,s0) values(19,136)",
+//      "insert into root.test.d0(timestamp,s0,s1) values(7,137,'138')",
+//      "insert into root.test.d0(timestamp,s1) values(30,'139')",
+//      "insert into root.test.d1.g0(timestamp,s0) values(4,150)",
+//      "insert into root.test.d0(timestamp,s0) values(1900,1316)",
+//      "insert into root.test.d0(timestamp,s0,s1) values(700,1307,'1038')",
+//      "insert into root.test.d0(timestamp,s1) values(3000,'1309')",
+//      "insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",};
+//  private boolean testFlag = Constant.testFlag;
+//  private static final String SYNC_CLIENT = Constans.SYNC_SENDER;
+//  private static final Logger logger = LoggerFactory.getLogger(SingleClientSyncTest.class);
+//
+//  public static void main(String[] args) throws Exception {
+//    SingleClientSyncTest singleClientPostBackTest = new SingleClientSyncTest();
+//    singleClientPostBackTest.setUp();
+//    singleClientPostBackTest.testPostback();
+//    singleClientPostBackTest.tearDown();
+//    System.exit(0);
+//  }
+//
+//  public void setConfig() {
+//    config.setSenderFolderPath(
+//        config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.UUID_FILE_NAME);
+//    config.setLastFileInfo(
+//        config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.LAST_LOCAL_FILE_NAME);
+//    String[] sequenceFileDirectory = config.getSeqFileDirectory();
+//    String[] snapshots = new String[config.getSeqFileDirectory().length];
+//    for (int i = 0; i < config.getSeqFileDirectory().length; i++) {
+//      sequenceFileDirectory[i] = new File(sequenceFileDirectory[i]).getAbsolutePath();
+//      if (!sequenceFileDirectory[i].endsWith(File.separator)) {
+//        sequenceFileDirectory[i] = sequenceFileDirectory[i] + File.separator;
+//      }
+//      snapshots[i] =
+//          sequenceFileDirectory[i] + SYNC_CLIENT + File.separator + Constans.DATA_SNAPSHOT_NAME
+//              + File.separator;
+//    }
+//    config.setSnapshotPaths(snapshots);
+//    config.setSeqFileDirectory(sequenceFileDirectory);
+//    config.setServerIp(serverIpTest);
+//    fileSenderImpl.setConfig(config);
+//  }
+//
+//  public void setUp() throws StartupException, IOException {
+//    if (testFlag) {
+//      EnvironmentUtils.closeStatMonitor();
+//      deamon = IoTDB.getInstance();
+//      deamon.active();
+//      EnvironmentUtils.envSetUp();
+//    }
+//    setConfig();
+//  }
+//
+//  public void tearDown() throws Exception {
+//    if (testFlag) {
+//      deamon.stop();
+//      EnvironmentUtils.cleanEnv();
+//    }
+//    if (success) {
+//      logger.debug("Test succeed!");
+//    } else {
+//      logger.debug("Test failed!");
+//    }
+//  }
+//
+//  public void testPostback() throws IOException, SyncConnectionException, ClassNotFoundException, SQLException, InterruptedException {
+//    if (testFlag) {
+//      // the first time to sync
+//      logger.debug("It's the first time to sync!");
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        try (Connection connection = DriverManager
+//            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+//                "root")) {
+//          Statement statement = connection.createStatement();
+//          for (String sql : sqls1) {
+//            statement.execute(sql);
+//          }
+//          statement.close();
+//        }
+//      } catch (SQLException | ClassNotFoundException e) {
+//        fail(e.getMessage());
+//      }
+//
+//      fileSenderImpl.sync();
+//
+//      // Compare data of sender and receiver
+//      dataSender.clear();
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        try (Connection connection = DriverManager
+//            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+//                "root")) {
+//          Statement statement = connection.createStatement();
+//          boolean hasResultSet = statement.execute("select * from root.vehicle");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.test");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//            }
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        }
+//      } catch (ClassNotFoundException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//
+//      dataReceiver.clear();
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        Connection connection = null;
+//        try {
+//          connection = DriverManager
+//              .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root",
+//                  "root");
+//          Statement statement = connection.createStatement();
+//          boolean hasResultSet = statement.execute("select * from root.vehicle");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//            }
+//          }
+//
+//          hasResultSet = statement.execute("select * from root.test");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//            }
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        } finally {
+//          if (connection != null) {
+//            connection.close();
+//          }
+//        }
+//      } catch (ClassNotFoundException | SQLException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//      logger.debug(String.valueOf(dataSender.size()));
+//      logger.debug(String.valueOf(dataReceiver.size()));
+//      logger.debug(dataSender.toString());
+//      logger.debug(dataReceiver.toString());
+//      if (!(dataSender.size() == dataReceiver.size() && dataSender.containsAll(dataReceiver))) {
+//        success = false;
+//        return;
+//      }
+//
+//      // the second time to sync
+//      logger.debug("It's the second time to sync!");
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        Connection connection = null;
+//        try {
+//          connection = DriverManager
+//              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+//                  "root");
+//          Statement statement = connection.createStatement();
+//          for (String sql : sqls2) {
+//            statement.execute(sql);
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        } finally {
+//          if (connection != null) {
+//            connection.close();
+//          }
+//        }
+//      } catch (ClassNotFoundException | SQLException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//
+//      fileSenderImpl.sync();
+//
+//      // Compare data of sender and receiver
+//      dataSender.clear();
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        Connection connection = null;
+//        try {
+//          connection = DriverManager
+//              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+//                  "root");
+//          Statement statement = connection.createStatement();
+//          boolean hasResultSet = statement.execute("select * from root.vehicle");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.test");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//            }
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        } finally {
+//          if (connection != null) {
+//            connection.close();
+//          }
+//        }
+//      } catch (ClassNotFoundException | SQLException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//
+//      dataReceiver.clear();
+//      {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        Connection connection = null;
+//        try {
+//          connection = DriverManager
+//              .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root",
+//                  "root");
+//          Statement statement = connection.createStatement();
+//          boolean hasResultSet = statement.execute("select * from root.vehicle");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.test");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//            }
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        } finally {
+//          if (connection != null) {
+//            connection.close();
+//          }
+//        }
+//      }
+//      logger.debug(String.valueOf(dataSender.size()));
+//      logger.debug(String.valueOf(dataReceiver.size()));
+//      logger.debug(dataSender.toString());
+//      logger.debug(dataReceiver.toString());
+//      if (!(dataSender.size() == dataReceiver.size() && dataSender.containsAll(dataReceiver))) {
+//        success = false;
+//        return;
+//      }
+//
+//      // the third time to sync
+//      logger.debug("It's the third time to sync!");
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        try (Connection connection = DriverManager
+//            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+//                "root")) {
+//          Statement statement = connection.createStatement();
+//          for (String sql : sqls3) {
+//            statement.execute(sql);
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        }
+//      } catch (ClassNotFoundException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//
+//      fileSenderImpl.sync();
+//
+//      // Compare data of sender and receiver
+//      dataSender.clear();
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        try (Connection connection = DriverManager
+//            .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root",
+//                "root")) {
+//          Statement statement = connection.createStatement();
+//          boolean hasResultSet = statement.execute("select * from root.vehicle");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.test");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.flush");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.flush.d0.s0")
+//                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.flush.d0.s0")
+//                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.iotdb");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataSender.add(res.getString("Time") + res.getString("root.iotdb.d0.s0")
+//                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
+//                  + res.getString("root.iotdb.d1.s3"));
+//              logger.debug(res.getString("Time") + res.getString("root.iotdb.d0.s0")
+//                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
+//                  + res.getString("root.iotdb.d1.s3"));
+//            }
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        }
+//      } catch (ClassNotFoundException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//
+//      dataReceiver.clear();
+//      try {
+//        Class.forName(Config.JDBC_DRIVER_NAME);
+//        Connection connection = null;
+//        try {
+//          connection = DriverManager
+//              .getConnection("jdbc:iotdb://192.168.130.8:6667/", "root", "root");
+//          Statement statement = connection.createStatement();
+//          boolean hasResultSet = statement.execute("select * from root.vehicle");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.vehicle.d0.s0")
+//                  + res.getString("root.vehicle.d0.s1") + res.getString("root.vehicle.d1.s2")
+//                  + res.getString("root.vehicle.d1.s3"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.test");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.test.d0.s0")
+//                  + res.getString("root.test.d0.s1") + res.getString("root.test.d1.g0.s0"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.flush");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.flush.d0.s0")
+//                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
+//              logger.debug(res.getString("Time") + " | " + res.getString("root.flush.d0.s0")
+//                  + res.getString("root.flush.d0.s1") + res.getString("root.flush.d1.g0.s0"));
+//            }
+//          }
+//          hasResultSet = statement.execute("select * from root.iotdb");
+//          if (hasResultSet) {
+//            ResultSet res = statement.getResultSet();
+//            while (res.next()) {
+//              dataReceiver.add(res.getString("Time") + res.getString("root.iotdb.d0.s0")
+//                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
+//                  + res.getString("root.iotdb.d1.s3"));
+//              logger.debug(res.getString("Time") + res.getString("root.iotdb.d0.s0")
+//                  + res.getString("root.iotdb.d0.s1") + res.getString("root.iotdb.d1.s2")
+//                  + res.getString("root.iotdb.d1.s3"));
+//            }
+//          }
+//          statement.close();
+//        } catch (Exception e) {
+//          logger.error("", e);
+//        } finally {
+//          if (connection != null) {
+//            connection.close();
+//          }
+//        }
+//      } catch (ClassNotFoundException | SQLException e) {
+//        fail(e.getMessage());
+//        Thread.currentThread().interrupt();
+//      }
+//      logger.debug(String.valueOf(dataSender.size()));
+//      logger.debug(String.valueOf(dataReceiver.size()));
+//      logger.debug(String.valueOf(dataSender));
+//      logger.debug(String.valueOf(dataReceiver));
+//      if (!(dataSender.size() == dataReceiver.size() && dataSender.containsAll(dataReceiver))) {
+//        success = false;
+//      }
+//    }
+//  }
+//}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
index 322dfe0..354ef74 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
@@ -1,374 +1,374 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import org.apache.iotdb.db.sync.sender.conf.Constans;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SyncFileManagerTest {
-
-  private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_SENDER + File.separator;
-  private static final String LAST_FILE_INFO_TEST =
-      POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME;
-  private static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data";
-  private SyncFileManager manager = SyncFileManager.getInstance();
-  private static final Logger logger = LoggerFactory.getLogger(SyncFileManagerTest.class);
-
-  @Before
-  public void setUp() throws IOException, InterruptedException {
-    File file = new File(LAST_FILE_INFO_TEST);
-    if (!file.getParentFile().exists()) {
-      file.getParentFile().mkdirs();
-    }
-    if (!file.exists() && !file.createNewFile()) {
-      logger.error("Can not create new file {}", file.getPath());
-    }
-    file = new File(SENDER_FILE_PATH_TEST);
-    if (!file.exists()) {
-      file.mkdirs();
-    }
-    manager.setCurrentLocalFiles(new HashMap<>());
-  }
-
-  @After
-  public void tearDown() throws InterruptedException {
-    delete(new File(POST_BACK_DIRECTORY_TEST));
-    new File(POST_BACK_DIRECTORY_TEST).delete();
-  }
-
-  public void delete(File file) {
-    if (file.isFile() || file.list().length == 0) {
-      file.delete();
-    } else {
-      File[] files = file.listFiles();
-      for (File f : files) {
-        delete(f);
-        f.delete();
-      }
-    }
-  }
-
-  @Test // It tests two classes : backupNowLocalFileInfo and getLastLocalFileList
-  public void testBackupCurrentLocalFileInfo() throws IOException {
-    Map<String, Set<String>> allFileList = new HashMap<>();
-
-    Random r = new Random(0);
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 5; j++) {
-        if (!allFileList.containsKey(String.valueOf(i))) {
-          allFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        String rand = String.valueOf(r.nextInt(10000));
-        String fileName =
-            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
-        File file = new File(fileName);
-        allFileList.get(String.valueOf(i)).add(file.getPath());
-        if (!file.getParentFile().exists()) {
-          file.getParentFile().mkdirs();
-        }
-        if (!file.exists() && !file.createNewFile()) {
-          logger.error("Can not create new file {}", file.getPath());
-        }
-      }
-    }
-    Set<String> lastFileList;
-
-    // lastFileList is empty
-    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
-    lastFileList = manager.getLastLocalFiles();
-    assert (lastFileList.isEmpty());
-
-    // add some files
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
-    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
-    lastFileList = manager.getLastLocalFiles();
-    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
-      assert (lastFileList.containsAll(entry.getValue()));
-    }
-
-    // add some files and delete some files
-    r = new Random(1);
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 5; j++) {
-        if (!allFileList.containsKey(String.valueOf(i))) {
-          allFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        String rand = String.valueOf(r.nextInt(10000));
-        String fileName =
-            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
-        File file = new File(fileName);
-        allFileList.get(String.valueOf(i)).add(file.getPath());
-        if (!file.getParentFile().exists()) {
-          file.getParentFile().mkdirs();
-        }
-        if (!file.exists() && !file.createNewFile()) {
-          logger.error("Can not create new file {}", file.getPath());
-        }
-      }
-    }
-    int count = 0;
-    Map<String, Set<String>> deleteFile = new HashMap<>();
-    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
-      deleteFile.put(entry.getKey(), new HashSet<>());
-      for (String path : entry.getValue()) {
-        count++;
-        if (count % 3 == 0) {
-          deleteFile.get(entry.getKey()).add(path);
-        }
-      }
-    }
-    for (Entry<String, Set<String>> entry : deleteFile.entrySet()) {
-      for (String path : entry.getValue()) {
-        new File(path).delete();
-        allFileList.get(entry.getKey()).remove(path);
-      }
-    }
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
-    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
-    lastFileList = manager.getLastLocalFiles();
-    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
-      assert (lastFileList.containsAll(entry.getValue()));
-    }
-  }
-
-  @Test
-  public void testGetCurrentLocalFileList() throws IOException {
-    Map<String, Set<String>> allFileList = new HashMap<>();
-    Map<String, Set<String>> fileList;
-
-    // nowLocalList is empty
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    fileList = manager.getCurrentLocalFiles();
-    assert (isEmpty(fileList));
-
-    // add some files
-    Random r = new Random(0);
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 5; j++) {
-        if (!allFileList.containsKey(String.valueOf(i))) {
-          allFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        String rand = String.valueOf(r.nextInt(10000));
-        String fileName =
-            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
-        File file = new File(fileName);
-        allFileList.get(String.valueOf(i)).add(file.getPath());
-        if (!file.getParentFile().exists()) {
-          file.getParentFile().mkdirs();
-        }
-        if (!file.exists() && !file.createNewFile()) {
-          logger.error("Can not create new file {}", file.getPath());
-        }
-      }
-    }
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    fileList = manager.getCurrentLocalFiles();
-    assert (allFileList.size() == fileList.size());
-    for (Entry<String, Set<String>> entry : fileList.entrySet()) {
-      assert (allFileList.containsKey(entry.getKey()));
-      assert (allFileList.get(entry.getKey()).containsAll(entry.getValue()));
-    }
-
-    // delete some files and add some files
-    int count = 0;
-    Map<String, Set<String>> deleteFile = new HashMap<>();
-    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
-      deleteFile.put(entry.getKey(), new HashSet<>());
-      for (String path : entry.getValue()) {
-        count++;
-        if (count % 3 == 0) {
-          deleteFile.get(entry.getKey()).add(path);
-        }
-      }
-    }
-    for (Entry<String, Set<String>> entry : deleteFile.entrySet()) {
-      for (String path : entry.getValue()) {
-        new File(path).delete();
-        allFileList.get(entry.getKey()).remove(path);
-      }
-    }
-    r = new Random(1);
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 5; j++) {
-        if (!allFileList.containsKey(String.valueOf(i))) {
-          allFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        String rand = String.valueOf(r.nextInt(10000));
-        String fileName =
-            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
-        File file = new File(fileName);
-        allFileList.get(String.valueOf(i)).add(file.getPath());
-        if (!file.getParentFile().exists()) {
-          file.getParentFile().mkdirs();
-        }
-        if (!file.exists() && !file.createNewFile()) {
-          logger.error("Can not create new file {}", file.getPath());
-        }
-      }
-    }
-    manager.setCurrentLocalFiles(new HashMap<>());
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    fileList = manager.getCurrentLocalFiles();
-    assert (allFileList.size() == fileList.size());
-    for (Entry<String, Set<String>> entry : fileList.entrySet()) {
-      assert (allFileList.containsKey(entry.getKey()));
-      logger.debug("allFileList");
-      for (String a : allFileList.get(entry.getKey())) {
-        logger.debug(a);
-      }
-      logger.debug("FileList");
-      for (String a : entry.getValue()) {
-        logger.debug(a);
-      }
-      assert (allFileList.get(entry.getKey()).containsAll(entry.getValue()));
-    }
-  }
-
-  @Test
-  public void testGetValidFileList() throws IOException {
-    Map<String, Set<String>> allFileList;
-    Map<String, Set<String>> newFileList = new HashMap<>();
-    Map<String, Set<String>> sendingFileList;
-    Set<String> lastLocalList;
-
-    // nowSendingList is empty
-
-    manager.setCurrentLocalFiles(new HashMap<>());
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    allFileList = manager.getCurrentLocalFiles();
-    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
-    lastLocalList = manager.getLastLocalFiles();
-    manager.getValidFileList();
-    assert (lastLocalList.isEmpty());
-    assert (isEmpty(allFileList));
-
-    // add some files
-    newFileList.clear();
-    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
-    Random r = new Random(0);
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 5; j++) {
-        if (!allFileList.containsKey(String.valueOf(i))) {
-          allFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        if (!newFileList.containsKey(String.valueOf(i))) {
-          newFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        String rand = String.valueOf(r.nextInt(10000));
-        String fileName =
-            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
-        File file = new File(fileName);
-        allFileList.get(String.valueOf(i)).add(file.getPath());
-        newFileList.get(String.valueOf(i)).add(file.getPath());
-        if (!file.getParentFile().exists()) {
-          file.getParentFile().mkdirs();
-        }
-        if (!file.exists() && !file.createNewFile()) {
-          logger.error("Can not create new file {}", file.getPath());
-        }
-      }
-    }
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    allFileList = manager.getCurrentLocalFiles();
-    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
-    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
-    manager.getValidFileList();
-    sendingFileList = manager.getValidAllFiles();
-    assert (sendingFileList.size() == newFileList.size());
-    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
-      assert (newFileList.containsKey(entry.getKey()));
-      assert (newFileList.get(entry.getKey()).containsAll(entry.getValue()));
-    }
-
-    // delete some files and add some files
-    int count = 0;
-    Map<String, Set<String>> deleteFile = new HashMap<>();
-    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
-      deleteFile.put(entry.getKey(), new HashSet<>());
-      for (String path : entry.getValue()) {
-        count++;
-        if (count % 3 == 0) {
-          deleteFile.get(entry.getKey()).add(path);
-        }
-      }
-    }
-    for (Entry<String, Set<String>> entry : deleteFile.entrySet()) {
-      for (String path : entry.getValue()) {
-        new File(path).delete();
-        allFileList.get(entry.getKey()).remove(path);
-      }
-    }
-    newFileList.clear();
-    r = new Random(1);
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 5; j++) {
-        if (!allFileList.containsKey(String.valueOf(i))) {
-          allFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        if (!newFileList.containsKey(String.valueOf(i))) {
-          newFileList.put(String.valueOf(i), new HashSet<>());
-        }
-        String rand = String.valueOf(r.nextInt(10000));
-        String fileName =
-            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
-        File file = new File(fileName);
-        allFileList.get(String.valueOf(i)).add(file.getPath());
-        newFileList.get(String.valueOf(i)).add(file.getPath());
-        if (!file.getParentFile().exists()) {
-          file.getParentFile().mkdirs();
-        }
-        if (!file.exists() && !file.createNewFile()) {
-          logger.error("Can not create new file {}", file.getPath());
-        }
-      }
-    }
-    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
-    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
-    manager.getValidFileList();
-    sendingFileList = manager.getValidAllFiles();
-    assert (sendingFileList.size() == newFileList.size());
-    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
-      assert (newFileList.containsKey(entry.getKey()));
-      assert (newFileList.get(entry.getKey()).containsAll(entry.getValue()));
-    }
-  }
-
-  private boolean isEmpty(Map<String, Set<String>> sendingFileList) {
-    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
-      if (!entry.getValue().isEmpty()) {
-        return false;
-      }
-    }
-    return true;
-  }
-}
\ No newline at end of file
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.sync.sender;
+//
+//import java.io.File;
+//import java.io.IOException;
+//import java.util.HashMap;
+//import java.util.HashSet;
+//import java.util.Map;
+//import java.util.Map.Entry;
+//import java.util.Random;
+//import java.util.Set;
+//import org.apache.iotdb.db.sync.sender.conf.Constans;
+//import org.junit.After;
+//import org.junit.Before;
+//import org.junit.Test;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class SyncFileManagerTest {
+//
+//  private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_SENDER + File.separator;
+//  private static final String LAST_FILE_INFO_TEST =
+//      POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME;
+//  private static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data";
+//  private SyncFileManager manager = SyncFileManager.getInstance();
+//  private static final Logger logger = LoggerFactory.getLogger(SyncFileManagerTest.class);
+//
+//  @Before
+//  public void setUp() throws IOException, InterruptedException {
+//    File file = new File(LAST_FILE_INFO_TEST);
+//    if (!file.getParentFile().exists()) {
+//      file.getParentFile().mkdirs();
+//    }
+//    if (!file.exists() && !file.createNewFile()) {
+//      logger.error("Can not create new file {}", file.getPath());
+//    }
+//    file = new File(SENDER_FILE_PATH_TEST);
+//    if (!file.exists()) {
+//      file.mkdirs();
+//    }
+//    manager.setCurrentLocalFiles(new HashMap<>());
+//  }
+//
+//  @After
+//  public void tearDown() throws InterruptedException {
+//    delete(new File(POST_BACK_DIRECTORY_TEST));
+//    new File(POST_BACK_DIRECTORY_TEST).delete();
+//  }
+//
+//  public void delete(File file) {
+//    if (file.isFile() || file.list().length == 0) {
+//      file.delete();
+//    } else {
+//      File[] files = file.listFiles();
+//      for (File f : files) {
+//        delete(f);
+//        f.delete();
+//      }
+//    }
+//  }
+//
+//  @Test // It tests two classes : backupNowLocalFileInfo and getLastLocalFileList
+//  public void testBackupCurrentLocalFileInfo() throws IOException {
+//    Map<String, Set<String>> allFileList = new HashMap<>();
+//
+//    Random r = new Random(0);
+//    for (int i = 0; i < 3; i++) {
+//      for (int j = 0; j < 5; j++) {
+//        if (!allFileList.containsKey(String.valueOf(i))) {
+//          allFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        String rand = String.valueOf(r.nextInt(10000));
+//        String fileName =
+//            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
+//        File file = new File(fileName);
+//        allFileList.get(String.valueOf(i)).add(file.getPath());
+//        if (!file.getParentFile().exists()) {
+//          file.getParentFile().mkdirs();
+//        }
+//        if (!file.exists() && !file.createNewFile()) {
+//          logger.error("Can not create new file {}", file.getPath());
+//        }
+//      }
+//    }
+//    Set<String> lastFileList;
+//
+//    // lastFileList is empty
+//    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
+//    lastFileList = manager.getLastLocalFiles();
+//    assert (lastFileList.isEmpty());
+//
+//    // add some files
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
+//    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
+//    lastFileList = manager.getLastLocalFiles();
+//    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
+//      assert (lastFileList.containsAll(entry.getValue()));
+//    }
+//
+//    // add some files and delete some files
+//    r = new Random(1);
+//    for (int i = 0; i < 3; i++) {
+//      for (int j = 0; j < 5; j++) {
+//        if (!allFileList.containsKey(String.valueOf(i))) {
+//          allFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        String rand = String.valueOf(r.nextInt(10000));
+//        String fileName =
+//            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
+//        File file = new File(fileName);
+//        allFileList.get(String.valueOf(i)).add(file.getPath());
+//        if (!file.getParentFile().exists()) {
+//          file.getParentFile().mkdirs();
+//        }
+//        if (!file.exists() && !file.createNewFile()) {
+//          logger.error("Can not create new file {}", file.getPath());
+//        }
+//      }
+//    }
+//    int count = 0;
+//    Map<String, Set<String>> deleteFile = new HashMap<>();
+//    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
+//      deleteFile.put(entry.getKey(), new HashSet<>());
+//      for (String path : entry.getValue()) {
+//        count++;
+//        if (count % 3 == 0) {
+//          deleteFile.get(entry.getKey()).add(path);
+//        }
+//      }
+//    }
+//    for (Entry<String, Set<String>> entry : deleteFile.entrySet()) {
+//      for (String path : entry.getValue()) {
+//        new File(path).delete();
+//        allFileList.get(entry.getKey()).remove(path);
+//      }
+//    }
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
+//    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
+//    lastFileList = manager.getLastLocalFiles();
+//    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
+//      assert (lastFileList.containsAll(entry.getValue()));
+//    }
+//  }
+//
+//  @Test
+//  public void testGetCurrentLocalFileList() throws IOException {
+//    Map<String, Set<String>> allFileList = new HashMap<>();
+//    Map<String, Set<String>> fileList;
+//
+//    // nowLocalList is empty
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    fileList = manager.getCurrentLocalFiles();
+//    assert (isEmpty(fileList));
+//
+//    // add some files
+//    Random r = new Random(0);
+//    for (int i = 0; i < 3; i++) {
+//      for (int j = 0; j < 5; j++) {
+//        if (!allFileList.containsKey(String.valueOf(i))) {
+//          allFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        String rand = String.valueOf(r.nextInt(10000));
+//        String fileName =
+//            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
+//        File file = new File(fileName);
+//        allFileList.get(String.valueOf(i)).add(file.getPath());
+//        if (!file.getParentFile().exists()) {
+//          file.getParentFile().mkdirs();
+//        }
+//        if (!file.exists() && !file.createNewFile()) {
+//          logger.error("Can not create new file {}", file.getPath());
+//        }
+//      }
+//    }
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    fileList = manager.getCurrentLocalFiles();
+//    assert (allFileList.size() == fileList.size());
+//    for (Entry<String, Set<String>> entry : fileList.entrySet()) {
+//      assert (allFileList.containsKey(entry.getKey()));
+//      assert (allFileList.get(entry.getKey()).containsAll(entry.getValue()));
+//    }
+//
+//    // delete some files and add some files
+//    int count = 0;
+//    Map<String, Set<String>> deleteFile = new HashMap<>();
+//    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
+//      deleteFile.put(entry.getKey(), new HashSet<>());
+//      for (String path : entry.getValue()) {
+//        count++;
+//        if (count % 3 == 0) {
+//          deleteFile.get(entry.getKey()).add(path);
+//        }
+//      }
+//    }
+//    for (Entry<String, Set<String>> entry : deleteFile.entrySet()) {
+//      for (String path : entry.getValue()) {
+//        new File(path).delete();
+//        allFileList.get(entry.getKey()).remove(path);
+//      }
+//    }
+//    r = new Random(1);
+//    for (int i = 0; i < 3; i++) {
+//      for (int j = 0; j < 5; j++) {
+//        if (!allFileList.containsKey(String.valueOf(i))) {
+//          allFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        String rand = String.valueOf(r.nextInt(10000));
+//        String fileName =
+//            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
+//        File file = new File(fileName);
+//        allFileList.get(String.valueOf(i)).add(file.getPath());
+//        if (!file.getParentFile().exists()) {
+//          file.getParentFile().mkdirs();
+//        }
+//        if (!file.exists() && !file.createNewFile()) {
+//          logger.error("Can not create new file {}", file.getPath());
+//        }
+//      }
+//    }
+//    manager.setCurrentLocalFiles(new HashMap<>());
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    fileList = manager.getCurrentLocalFiles();
+//    assert (allFileList.size() == fileList.size());
+//    for (Entry<String, Set<String>> entry : fileList.entrySet()) {
+//      assert (allFileList.containsKey(entry.getKey()));
+//      logger.debug("allFileList");
+//      for (String a : allFileList.get(entry.getKey())) {
+//        logger.debug(a);
+//      }
+//      logger.debug("FileList");
+//      for (String a : entry.getValue()) {
+//        logger.debug(a);
+//      }
+//      assert (allFileList.get(entry.getKey()).containsAll(entry.getValue()));
+//    }
+//  }
+//
+//  @Test
+//  public void testGetValidFileList() throws IOException {
+//    Map<String, Set<String>> allFileList;
+//    Map<String, Set<String>> newFileList = new HashMap<>();
+//    Map<String, Set<String>> sendingFileList;
+//    Set<String> lastLocalList;
+//
+//    // nowSendingList is empty
+//
+//    manager.setCurrentLocalFiles(new HashMap<>());
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    allFileList = manager.getCurrentLocalFiles();
+//    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
+//    lastLocalList = manager.getLastLocalFiles();
+//    manager.getValidFileList();
+//    assert (lastLocalList.isEmpty());
+//    assert (isEmpty(allFileList));
+//
+//    // add some files
+//    newFileList.clear();
+//    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
+//    Random r = new Random(0);
+//    for (int i = 0; i < 3; i++) {
+//      for (int j = 0; j < 5; j++) {
+//        if (!allFileList.containsKey(String.valueOf(i))) {
+//          allFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        if (!newFileList.containsKey(String.valueOf(i))) {
+//          newFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        String rand = String.valueOf(r.nextInt(10000));
+//        String fileName =
+//            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
+//        File file = new File(fileName);
+//        allFileList.get(String.valueOf(i)).add(file.getPath());
+//        newFileList.get(String.valueOf(i)).add(file.getPath());
+//        if (!file.getParentFile().exists()) {
+//          file.getParentFile().mkdirs();
+//        }
+//        if (!file.exists() && !file.createNewFile()) {
+//          logger.error("Can not create new file {}", file.getPath());
+//        }
+//      }
+//    }
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    allFileList = manager.getCurrentLocalFiles();
+//    manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST);
+//    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
+//    manager.getValidFileList();
+//    sendingFileList = manager.getValidAllFiles();
+//    assert (sendingFileList.size() == newFileList.size());
+//    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
+//      assert (newFileList.containsKey(entry.getKey()));
+//      assert (newFileList.get(entry.getKey()).containsAll(entry.getValue()));
+//    }
+//
+//    // delete some files and add some files
+//    int count = 0;
+//    Map<String, Set<String>> deleteFile = new HashMap<>();
+//    for (Entry<String, Set<String>> entry : allFileList.entrySet()) {
+//      deleteFile.put(entry.getKey(), new HashSet<>());
+//      for (String path : entry.getValue()) {
+//        count++;
+//        if (count % 3 == 0) {
+//          deleteFile.get(entry.getKey()).add(path);
+//        }
+//      }
+//    }
+//    for (Entry<String, Set<String>> entry : deleteFile.entrySet()) {
+//      for (String path : entry.getValue()) {
+//        new File(path).delete();
+//        allFileList.get(entry.getKey()).remove(path);
+//      }
+//    }
+//    newFileList.clear();
+//    r = new Random(1);
+//    for (int i = 0; i < 3; i++) {
+//      for (int j = 0; j < 5; j++) {
+//        if (!allFileList.containsKey(String.valueOf(i))) {
+//          allFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        if (!newFileList.containsKey(String.valueOf(i))) {
+//          newFileList.put(String.valueOf(i), new HashSet<>());
+//        }
+//        String rand = String.valueOf(r.nextInt(10000));
+//        String fileName =
+//            SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand;
+//        File file = new File(fileName);
+//        allFileList.get(String.valueOf(i)).add(file.getPath());
+//        newFileList.get(String.valueOf(i)).add(file.getPath());
+//        if (!file.getParentFile().exists()) {
+//          file.getParentFile().mkdirs();
+//        }
+//        if (!file.exists() && !file.createNewFile()) {
+//          logger.error("Can not create new file {}", file.getPath());
+//        }
+//      }
+//    }
+//    manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST});
+//    manager.getLastLocalFileList(LAST_FILE_INFO_TEST);
+//    manager.getValidFileList();
+//    sendingFileList = manager.getValidAllFiles();
+//    assert (sendingFileList.size() == newFileList.size());
+//    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
+//      assert (newFileList.containsKey(entry.getKey()));
+//      assert (newFileList.get(entry.getKey()).containsAll(entry.getValue()));
+//    }
+//  }
+//
+//  private boolean isEmpty(Map<String, Set<String>> sendingFileList) {
+//    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
+//      if (!entry.getValue().isEmpty()) {
+//        return false;
+//      }
+//    }
+//    return true;
+//  }
+//}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/RandomNum.java b/server/src/test/java/org/apache/iotdb/db/utils/RandomNum.java
new file mode 100644
index 0000000..436dc28
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/utils/RandomNum.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.util.Random;
+
+public class RandomNum {
+
+  private static Random random = new Random();
+
+  private RandomNum() {
+    throw new IllegalStateException("Utility class");
+  }
+
+  public static long getRandomLong(long min, long max) {
+    return random.nextLong() % (max - min + 1) + min;
+  }
+
+  public static int getRandomInt(int min, int max) {
+    return (random.nextInt(10000) % (max - min) + min);
+  }
+
+  /**
+   * get random float between min and max.
+   */
+  public static float getRandomFloat(float min, float max) {
+
+    return (random.nextFloat() * (max - min) + min);
+  }
+
+  /**
+   * get random int between 0 and frequency.
+   */
+  public static int getAbnormalData(int frequency) {
+    return random.nextInt() % frequency;
+  }
+
+  /**
+   * get random text consisting of lowercase letters and numbers.
+   *
+   * @param length -the size of random text
+   */
+  public static String getRandomText(int length) {
+
+    String base = "abcdefghijklmnopqrstuvwxyz0123456789";
+    StringBuilder st = new StringBuilder();
+    for (int i = 0; i < length; i++) {
+      int number = random.nextInt(base.length());
+      st = st.append(base.charAt(number));
+    }
+    return st.toString();
+
+  }
+}
\ No newline at end of file


[incubator-iotdb] 01/03: update

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0d96f69b3fc19ffc96a696017b0b80c4867c883f
Author: lta <li...@163.com>
AuthorDate: Sun Aug 25 19:35:46 2019 +0800

    update
---
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  31 ++-
 .../db/sync/sender/MultipleClientSyncTest.java     | 226 -----------------
 .../org/apache/iotdb/db/sync/test/RandomNum.java   |  70 -----
 .../apache/iotdb/db/sync/test/SyncTestClient1.java | 258 -------------------
 .../apache/iotdb/db/sync/test/SyncTestClient2.java | 262 -------------------
 .../apache/iotdb/db/sync/test/SyncTestClient3.java | 282 ---------------------
 .../java/org/apache/iotdb/db/sync/test/Utils.java  |  44 ----
 7 files changed, 21 insertions(+), 1152 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index e2149b6..7156ef7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -67,8 +67,6 @@ public class SyncServiceImpl implements SyncService.Iface {
 
   private ThreadLocal<String> syncFolderPath = new ThreadLocal<>();
 
-  private ThreadLocal<String> syncDataPath = new ThreadLocal<>();
-
   private ThreadLocal<String> currentSG = new ThreadLocal<>();
 
   private ThreadLocal<SyncReceiverLogger> syncLog = new ThreadLocal<>();
@@ -101,7 +99,15 @@ public class SyncServiceImpl implements SyncService.Iface {
   }
 
   private boolean checkRecovery() {
-    return true;
+    try {
+      if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
+        currentFileWriter.get().close();
+      }
+      return true;
+    } catch (IOException e) {
+      e.printStackTrace();
+      return false;
+    }
   }
 
   @Override
@@ -109,7 +115,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       initPath();
       currentSG.remove();
-      syncLog.set(new SyncReceiverLogger(new File(syncDataPath.get(), Constans.SYNC_LOG_NAME)));
+      syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), Constans.SYNC_LOG_NAME)));
       return getSuccessResult();
     } catch (DiskSpaceInsufficientException | IOException e) {
       logger.error("Can not receiver data from sender", e);
@@ -125,8 +131,6 @@ public class SyncServiceImpl implements SyncService.Iface {
     syncFolderPath
         .set(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER + File.separatorChar
             + senderIp.get());
-    syncDataPath
-        .set(syncFolderPath.get() + File.separatorChar + Constans.RECEIVER_DATA_FOLDER_NAME);
   }
 
   /**
@@ -163,14 +167,17 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       File file;
       if (currentSG.get() == null) {
-        file = new File(syncDataPath.get(), filename);
+        file = new File(getSyncDataPath(), filename);
       } else {
-        file = new File(syncDataPath.get(), currentSG.get() + File.separatorChar + filename);
+        file = new File(getSyncDataPath(), currentSG.get() + File.separatorChar + filename);
       }
       currentFile.set(file);
       if (!file.getParentFile().exists()) {
         file.getParentFile().mkdirs();
       }
+      if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
+        currentFileWriter.get().close();
+      }
       currentFileWriter.set(new FileOutputStream(file).getChannel());
       syncLog.get().startSyncTsFiles();
     } catch (IOException e) {
@@ -294,14 +301,18 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus endSync() throws TException {
     try {
       syncLog.get().close();
-      new File(syncDataPath.get(), Constans.SYNC_END).createNewFile();
+      new File(getSyncDataPath(), Constans.SYNC_END).createNewFile();
     } catch (IOException e) {
       logger.error("Can not end sync", e);
-      return getErrorResult(String.format("Can not end sync because {}", e.getMessage()));
+      return getErrorResult(String.format("Can not end sync because %s", e.getMessage()));
     }
     return getSuccessResult();
   }
 
+  private String getSyncDataPath() {
+    return syncFolderPath.get() + File.separatorChar + Constans.RECEIVER_DATA_FOLDER_NAME;
+  }
+
   private ResultStatus getSuccessResult() {
     return new ResultStatus(true, null, null);
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java
deleted file mode 100644
index 52a7138..0000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender;
-
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.iotdb.jdbc.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MultipleClientSyncTest {
-
-  Map<String, ArrayList<String>> timeseriesList = new HashMap();
-  Map<String, ArrayList<String>> timeseriesList1 = new HashMap();
-  private static final Logger logger = LoggerFactory.getLogger(MultipleClientSyncTest.class);
-  private Set<String> dataSender = new HashSet<>();
-  private Set<String> dataReceiver = new HashSet<>();
-
-  public static void main(String[] args) throws IOException {
-    MultipleClientSyncTest multipleClientSyncTest = new MultipleClientSyncTest();
-    multipleClientSyncTest.testPostback();
-  }
-
-  public void testPostback() throws IOException {
-
-    timeseriesList1.put("root.vehicle_history1", new ArrayList<>());
-    timeseriesList1.put("root.vehicle_alarm1", new ArrayList<>());
-    timeseriesList1.put("root.vehicle_temp1", new ArrayList<>());
-    timeseriesList1.put("root.range_event1", new ArrayList<>());
-    timeseriesList.put("root.vehicle_history", new ArrayList<>());
-    timeseriesList.put("root.vehicle_alarm", new ArrayList<>());
-    timeseriesList.put("root.vehicle_temp", new ArrayList<>());
-    timeseriesList.put("root.range_event", new ArrayList<>());
-
-    File file = new File("CreateTimeseries1.txt");
-    BufferedReader reader = new BufferedReader(new FileReader(file));
-    String line;
-    while ((line = reader.readLine()) != null) {
-      String timeseries = line.split(" ")[2];
-      for (String storageGroup : timeseriesList.keySet()) {
-        if (timeseries.startsWith(storageGroup + ".")) {
-          String timesery = timeseries.substring((storageGroup + ".").length());
-          timeseriesList.get(storageGroup).add(timesery);
-          break;
-        }
-      }
-    }
-
-    file = new File("CreateTimeseries2.txt");
-    reader = new BufferedReader(new FileReader(file));
-    while ((line = reader.readLine()) != null) {
-      String timeseries = line.split(" ")[2];
-      for (String storageGroup : timeseriesList1.keySet()) {
-        if (timeseries.startsWith(storageGroup + ".")) {
-          String timesery = timeseries.substring((storageGroup + ".").length());
-          timeseriesList1.get(storageGroup).add(timesery);
-          break;
-        }
-      }
-    }
-
-    for (String storageGroup : timeseriesList.keySet()) {
-      String sqlFormat = "select %s from %s";
-      logger.debug(String.format("%s:", storageGroup));
-      int count = 0;
-      int count1 = 0;
-      int count2 = 0;
-      for (String timesery : timeseriesList.get(storageGroup)) {
-        count++;
-        count1 = 0;
-        count2 = 0;
-        dataSender.clear();
-        dataReceiver.clear();
-        try {
-          Class.forName(Config.JDBC_DRIVER_NAME);
-          Connection connection = null;
-          Connection connection1 = null;
-          try {
-            connection = DriverManager
-                .getConnection("jdbc:iotdb://192.168.130.14:6667/", "root", "root");
-            connection1 = DriverManager
-                .getConnection("jdbc:iotdb://192.168.130.16:6667/", "root", "root");
-            Statement statement = connection.createStatement();
-            Statement statement1 = connection1.createStatement();
-            String sql = String.format(sqlFormat, timesery, storageGroup);
-            boolean hasResultSet = statement.execute(sql);
-            boolean hasResultSet1 = statement1.execute(sql);
-            if (hasResultSet) {
-              ResultSet res = statement.getResultSet();
-              while (res.next()) {
-                count1++;
-                dataSender
-                    .add(res.getString("Time") + res.getString(storageGroup + "." + timesery));
-              }
-            }
-            if (hasResultSet1) {
-              ResultSet res = statement1.getResultSet();
-              while (res.next()) {
-                count2++;
-                dataReceiver
-                    .add(res.getString("Time") + res.getString(storageGroup + "." + timesery));
-              }
-            }
-            assert ((dataSender.size() == dataReceiver.size()) && dataSender
-                .containsAll(dataReceiver));
-            statement.close();
-            statement1.close();
-          } catch (Exception e) {
-            logger.error("", e);
-          } finally {
-            if (connection != null) {
-              connection.close();
-            }
-            if (connection1 != null) {
-              connection1.close();
-            }
-          }
-        } catch (ClassNotFoundException | SQLException e) {
-          fail(e.getMessage());
-        }
-        if (count > 20) {
-          break;
-        }
-        logger.debug(String.valueOf(count1));
-        logger.debug(String.valueOf(count2));
-      }
-    }
-
-    for (String storageGroup : timeseriesList1.keySet()) {
-      String sqlFormat = "select %s from %s";
-      logger.debug(String.format("%s:", storageGroup));
-      int count = 0;
-      int count1;
-      int count2;
-      for (String timesery : timeseriesList1.get(storageGroup)) {
-        count++;
-        count1 = 0;
-        count2 = 0;
-        dataSender.clear();
-        dataReceiver.clear();
-        try {
-          Class.forName(Config.JDBC_DRIVER_NAME);
-          Connection connection = null;
-          Connection connection1 = null;
-          try {
-            connection = DriverManager
-                .getConnection("jdbc:iotdb://192.168.130.15:6667/", "root", "root");
-            connection1 = DriverManager
-                .getConnection("jdbc:iotdb://192.168.130.16:6667/", "root", "root");
-            Statement statement = connection.createStatement();
-            Statement statement1 = connection1.createStatement();
-            String sql = String.format(sqlFormat, timesery, storageGroup);
-            boolean hasResultSet = statement.execute(sql);
-            boolean hasResultSet1 = statement1.execute(sql);
-            if (hasResultSet) {
-              ResultSet res = statement.getResultSet();
-              while (res.next()) {
-                count1++;
-                dataSender
-                    .add(res.getString("Time") + res.getString(storageGroup + "." + timesery));
-              }
-            }
-            if (hasResultSet1) {
-              ResultSet res = statement1.getResultSet();
-              while (res.next()) {
-                count2++;
-                dataReceiver
-                    .add(res.getString("Time") + res.getString(storageGroup + "." + timesery));
-              }
-            }
-            assert ((dataSender.size() == dataReceiver.size()) && dataSender
-                .containsAll(dataReceiver));
-            statement.close();
-            statement1.close();
-          } catch (Exception e) {
-            logger.error("", e);
-          } finally {
-            if (connection != null) {
-              connection.close();
-            }
-            if (connection1 != null) {
-              connection1.close();
-            }
-          }
-        } catch (ClassNotFoundException | SQLException e) {
-          fail(e.getMessage());
-        }
-        if (count > 20) {
-          break;
-        }
-        logger.debug(String.valueOf(count1));
-        logger.debug(String.valueOf(count2));
-      }
-    }
-  }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/test/RandomNum.java b/server/src/test/java/org/apache/iotdb/db/sync/test/RandomNum.java
deleted file mode 100644
index 125e9d3..0000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/test/RandomNum.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.test;
-
-import java.util.Random;
-
-public class RandomNum {
-
-  private static Random random = new Random();
-
-  private RandomNum() {
-    throw new IllegalStateException("Utility class");
-  }
-
-  public static long getRandomLong(long min, long max) {
-    return random.nextLong() % (max - min + 1) + min;
-  }
-
-  public static int getRandomInt(int min, int max) {
-    return (random.nextInt(10000) % (max - min) + min);
-  }
-
-  /**
-   * get random float between min and max.
-   */
-  public static float getRandomFloat(float min, float max) {
-
-    return (random.nextFloat() * (max - min) + min);
-  }
-
-  /**
-   * get random int between 0 and frequency.
-   */
-  public static int getAbnormalData(int frequency) {
-    return random.nextInt() % frequency;
-  }
-
-  /**
-   * get random text consisting of lowercase letters and numbers.
-   *
-   * @param length -the size of random text
-   */
-  public static String getRandomText(int length) {
-
-    String base = "abcdefghijklmnopqrstuvwxyz0123456789";
-    StringBuilder st = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      int number = random.nextInt(base.length());
-      st = st.append(base.charAt(number));
-    }
-    return st.toString();
-
-  }
-}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java b/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java
deleted file mode 100644
index e7e7b9e..0000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.test;
-
-import static org.apache.iotdb.db.sync.test.RandomNum.getRandomInt;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SyncTestClient1 is used to generate data of whole timeseries (simulating jilian scene) to test stability of
- * sync function.
- */
-public class SyncTestClient1 {
-
-  private static final int TIME_INTERVAL = 0;
-  private static final int TOTAL_DATA = 2000000;
-  private static final int ABNORMAL_MAX_INT = 0;
-  private static final int ABNORMAL_MIN_INT = -10;
-  private static final int ABNORMAL_MAX_FLOAT = 0;
-  private static final int ABNORMAL_MIN_FLOAT = -10;
-  private static final int ABNORMAL_FREQUENCY = Integer.MAX_VALUE;
-  private static final int ABNORMAL_LENGTH = 0;
-  private static final int MIN_INT = 0;
-  private static final int MAX_INT = 14;
-  private static final int MIN_FLOAT = 20;
-  private static final int MAX_FLOAT = 30;
-  private static final int STRING_LENGTH = 5;
-  private static final int BATCH_SQL = 10000;
-  private static final Logger logger = LoggerFactory.getLogger(SyncTestClient1.class);
-
-  /**
-   * generate time series map from file.
-   *
-   * @param inputFilePath input file path
-   * @return Map
-   * @throws Exception Exception
-   */
-  public static Map<String, String> generateTimeseriesMapFromFile(String inputFilePath)
-      throws IOException {
-
-    Map<String, String> timeseriesMap = new HashMap<>();
-
-    File file = new File(inputFilePath);
-    try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
-      String line;
-      while ((line = reader.readLine()) != null) {
-
-        String timeseries = line.split(" ")[2];
-        String dataType = line.split("DATATYPE = ")[1].split(",")[0].trim();
-        String encodingType = line.split("ENCODING = ")[1].split(";")[0].trim();
-        timeseriesMap.put(timeseries, dataType + "," + encodingType);
-      }
-    }
-    return timeseriesMap;
-
-  }
-
-  /**
-   * create time series.
-   *
-   * @param statement statement
-   * @param timeseriesMap time series map
-   */
-  public static void createTimeseries(Statement statement, Map<String, String> timeseriesMap) {
-
-    try {
-      String createTimeseriesSql = "CREATE TIMESERIES <timeseries> WITH DATATYPE=<datatype>, "
-          + "ENCODING=<encode>";
-
-      int sqlCount = 0;
-      for (Map.Entry<String, String> entry : timeseriesMap.entrySet()) {
-        String key = entry.getKey();
-        String properties = entry.getValue();
-        String sql = createTimeseriesSql.replace("<timeseries>", key)
-            .replace("<datatype>", Utils.getType(properties))
-            .replace("<encode>", Utils.getEncode(properties));
-        statement.addBatch(sql);
-        sqlCount++;
-        if (sqlCount >= BATCH_SQL) {
-          statement.executeBatch();
-          statement.clearBatch();
-          sqlCount = 0;
-        }
-      }
-      statement.executeBatch();
-      statement.clearBatch();
-    } catch (Exception e) {
-      logger.error("", e);
-    }
-  }
-
-  /**
-   * set storage group.
-   *
-   * @param statement statement
-   * @param storageGroupList storage group list
-   * @throws SQLException SQLException
-   */
-  public static void setStorageGroup(Statement statement, List<String> storageGroupList)
-      throws SQLException {
-    try {
-      String setStorageGroupSql = "SET STORAGE GROUP TO <prefixpath>";
-      for (String str : storageGroupList) {
-        String sql = setStorageGroupSql.replace("<prefixpath>", str);
-        statement.execute(sql);
-      }
-    } catch (Exception e) {
-      logger.error("", e);
-    }
-  }
-
-  /**
-   * random insert data.
-   *
-   * @param statement statement
-   * @param timeseriesMap time series map
-   * @throws Exception Exception
-   */
-  public static void randomInsertData(Statement statement, Map<String, String> timeseriesMap)
-      throws SQLException, InterruptedException {
-    String insertDataSql = "INSERT INTO %s (timestamp, %s) VALUES (%s, %s)";
-    int abnormalCount = 0;
-    int abnormalFlag = 1;
-    int sqlCount = 0;
-
-    for (int i = 0; i < TOTAL_DATA; i++) {
-
-      long time = System.currentTimeMillis();
-
-      if (i % ABNORMAL_FREQUENCY == 250) {
-        abnormalFlag = 0;
-      }
-
-      for (Map.Entry<String, String> entry : timeseriesMap.entrySet()) {
-        String key = entry.getKey();
-        String type = Utils.getType(entry.getValue());
-        String path = Utils.getPath(key);
-        String sensor = Utils.getSensor(key);
-        String sql = "";
-
-        if (type.equals("INT32")) {
-          int value;
-          if (abnormalFlag == 0) {
-            value = getRandomInt(ABNORMAL_MIN_INT, ABNORMAL_MAX_INT);
-          } else {
-            value = getRandomInt(MIN_INT, MAX_INT);
-          }
-          sql = String.format(insertDataSql, path, sensor, time, value);
-        } else if (type.equals("FLOAT")) {
-          float value;
-          if (abnormalFlag == 0) {
-            value = RandomNum.getRandomFloat(ABNORMAL_MIN_FLOAT, ABNORMAL_MAX_FLOAT);
-          } else {
-            value = RandomNum.getRandomFloat(MIN_FLOAT, MAX_FLOAT);
-          }
-          sql = String.format(insertDataSql, path, sensor, time, value);
-        } else if (type.equals("TEXT")) {
-          String value;
-          value = RandomNum.getRandomText(STRING_LENGTH);
-          sql = String.format(insertDataSql, path, sensor, time, "\"" + value + "\"");
-        }
-
-        statement.addBatch(sql);
-        sqlCount++;
-        if (sqlCount >= BATCH_SQL) {
-          statement.executeBatch();
-          statement.clearBatch();
-          sqlCount = 0;
-        }
-      }
-
-      if (abnormalFlag == 0) {
-        abnormalCount += 1;
-      }
-      if (abnormalCount >= ABNORMAL_LENGTH) {
-        abnormalCount = 0;
-        abnormalFlag = 1;
-      }
-    }
-    statement.executeBatch();
-    statement.clearBatch();
-  }
-
-  /**
-   * main function.
-   *
-   * @param args arguments
-   * @throws Exception Exception
-   */
-  public static void main(String[] args) throws Exception {
-
-    Statement statement = null;
-
-    String path =
-        new File(System.getProperty(IoTDBConstant.IOTDB_HOME, null)).getParent() + File.separator
-            + "src"
-            + File.separator + "test" + File.separator + "resources" + File.separator
-            + "CreateTimeseries1.txt";
-    Map<String, String> timeseriesMap = generateTimeseriesMapFromFile(path);
-
-    List<String> storageGroupList = new ArrayList<>();
-    storageGroupList.add("root.vehicle_history");
-    storageGroupList.add("root.vehicle_alarm");
-    storageGroupList.add("root.vehicle_temp");
-    storageGroupList.add("root.range_event");
-
-    try (Connection connection = DriverManager
-        .getConnection("jdbc:iotdb://localhost:6667/", "root", "root")) {
-      Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
-      statement = connection.createStatement();
-
-      setStorageGroup(statement, storageGroupList);
-      logger.debug("Finish set storage group.");
-      createTimeseries(statement, timeseriesMap);
-      logger.debug("Finish create timeseries.");
-      while (true) {
-        randomInsertData(statement, timeseriesMap);
-      }
-
-    } catch (Exception e) {
-      logger.error("", e);
-    } finally {
-      if (statement != null) {
-        statement.close();
-      }
-    }
-  }
-}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java b/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java
deleted file mode 100644
index 8f4de9f..0000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.test;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SyncTestClient2 is used to generate data of half timeseries (simulating jilian scene) to test stability of
- * sync function.
- */
-public class SyncTestClient2 {
-
-  private static final int TIME_INTERVAL = 0;
-  private static final int TOTAL_DATA = 2000000;
-  private static final int ABNORMAL_MAX_INT = 0;
-  private static final int ABNORMAL_MIN_INT = -10;
-  private static final int ABNORMAL_MAX_FLOAT = 0;
-  private static final int ABNORMAL_MIN_FLOAT = -10;
-  private static final int ABNORMAL_FREQUENCY = Integer.MAX_VALUE;
-  private static final int ABNORMAL_LENGTH = 0;
-  private static final int MIN_INT = 0;
-  private static final int MAX_INT = 14;
-  private static final int MIN_FLOAT = 20;
-  private static final int MAX_FLOAT = 30;
-  private static final int STRING_LENGTH = 5;
-  private static final int BATCH_SQL = 10000;
-  private static final Logger logger = LoggerFactory.getLogger(SyncTestClient2.class);
-
-  /**
-   * generate time series map from file.
-   *
-   * @param inputFilePath input file path
-   * @return map
-   * @throws Exception Exception
-   */
-  public static Map<String, String> generateTimeseriesMapFromFile(String inputFilePath)
-      throws IOException {
-
-    Map<String, String> timeseriesMap = new HashMap<>();
-
-    File file = new File(inputFilePath);
-    try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
-      String line;
-      while ((line = reader.readLine()) != null) {
-
-        String timeseries = line.split(" ")[2];
-        String dataType = line.split("DATATYPE = ")[1].split(",")[0].trim();
-        String encodingType = line.split("ENCODING = ")[1].split(";")[0].trim();
-        timeseriesMap.put(timeseries, dataType + "," + encodingType);
-      }
-    }
-
-    return timeseriesMap;
-
-  }
-
-  /**
-   * create time series.
-   *
-   * @param statement statement
-   * @param timeseriesMap time series map
-   * @throws SQLException SQLException
-   */
-  public static void createTimeseries(Statement statement, Map<String, String> timeseriesMap)
-      throws SQLException {
-
-    try {
-      String createTimeseriesSql = "CREATE TIMESERIES <timeseries> WITH DATATYPE=<datatype>, "
-          + "ENCODING=<encode>";
-
-      int sqlCount = 0;
-
-      for (Map.Entry<String, String> entry : timeseriesMap.entrySet()) {
-        String key = entry.getKey();
-        String properties = entry.getValue();
-        String sql = createTimeseriesSql.replace("<timeseries>", key)
-            .replace("<datatype>", Utils.getType(properties))
-            .replace("<encode>", Utils.getEncode(properties));
-
-        statement.addBatch(sql);
-        sqlCount++;
-        if (sqlCount >= BATCH_SQL) {
-          statement.executeBatch();
-          statement.clearBatch();
-          sqlCount = 0;
-        }
-      }
-      statement.executeBatch();
-      statement.clearBatch();
-    } catch (Exception e) {
-      logger.error("", e);
-    }
-  }
-
-  /**
-   * set storage group.
-   *
-   * @param statement statement
-   * @param storageGroupList storage group list
-   * @throws SQLException SQLException
-   */
-  public static void setStorageGroup(Statement statement, List<String> storageGroupList)
-      throws SQLException {
-
-    try {
-      String setStorageGroupSql = "SET STORAGE GROUP TO <prefixpath>";
-      for (String str : storageGroupList) {
-        String sql = setStorageGroupSql.replace("<prefixpath>", str);
-        statement.execute(sql);
-      }
-    } catch (Exception e) {
-      logger.error("", e);
-    }
-  }
-
-  /**
-   * randomly insert data.
-   *
-   * @param statement statement
-   * @param timeseriesMap time series map
-   */
-  public static void randomInsertData(Statement statement, Map<String, String> timeseriesMap)
-      throws SQLException, InterruptedException {
-    String insertDataSql = "INSERT INTO %s (timestamp, %s) VALUES (%s, %s)";
-    int abnormalCount = 0;
-    int abnormalFlag = 1;
-
-    int sqlCount = 0;
-
-    for (int i = 0; i < TOTAL_DATA; i++) {
-
-      long time = System.currentTimeMillis();
-
-      if (i % ABNORMAL_FREQUENCY == 250) {
-        abnormalFlag = 0;
-      }
-
-      for (Map.Entry<String, String> entry : timeseriesMap.entrySet()) {
-        String key = entry.getKey();
-        String type = Utils.getType(entry.getValue());
-        String path = Utils.getPath(key);
-        String sensor = Utils.getSensor(key);
-        String sql = "";
-
-        if (type.equals("INT32")) {
-          int value;
-          if (abnormalFlag == 0) {
-            value = RandomNum.getRandomInt(ABNORMAL_MIN_INT, ABNORMAL_MAX_INT);
-          } else {
-            value = RandomNum.getRandomInt(MIN_INT, MAX_INT);
-          }
-          sql = String.format(insertDataSql, path, sensor, time, value);
-        } else if (type.equals("FLOAT")) {
-          float value;
-          if (abnormalFlag == 0) {
-            value = RandomNum.getRandomFloat(ABNORMAL_MIN_FLOAT, ABNORMAL_MAX_FLOAT);
-          } else {
-            value = RandomNum.getRandomFloat(MIN_FLOAT, MAX_FLOAT);
-          }
-          sql = String.format(insertDataSql, path, sensor, time,value);
-        } else if (type.equals("TEXT")) {
-          String value;
-          value = RandomNum.getRandomText(STRING_LENGTH);
-          sql = String.format(insertDataSql, path, sensor, time, "\"" + value + "\"");
-        }
-
-        statement.addBatch(sql);
-        sqlCount++;
-        if (sqlCount >= BATCH_SQL) {
-          statement.executeBatch();
-          statement.clearBatch();
-          sqlCount = 0;
-        }
-      }
-
-      if (abnormalFlag == 0) {
-        abnormalCount += 1;
-      }
-      if (abnormalCount >= ABNORMAL_LENGTH) {
-        abnormalCount = 0;
-        abnormalFlag = 1;
-      }
-    }
-    statement.executeBatch();
-    statement.clearBatch();
-  }
-
-  /**
-   * main function.
-   *
-   * @param args arguments
-   * @throws Exception Exception
-   */
-  public static void main(String[] args) throws Exception {
-
-    Statement statement = null;
-
-    String path =
-        new File(System.getProperty(IoTDBConstant.IOTDB_HOME, null)).getParent() + File.separator
-            + "src"
-            + File.separator + "test" + File.separator + "resources" + File.separator
-            + "CreateTimeseries2.txt";
-    Map<String, String> timeseriesMap = generateTimeseriesMapFromFile(path);
-
-    List<String> storageGroupList = new ArrayList<>();
-    storageGroupList.add("root.vehicle_history1");
-    storageGroupList.add("root.vehicle_alarm1");
-    storageGroupList.add("root.vehicle_temp1");
-    storageGroupList.add("root.range_event1");
-
-    try (Connection connection = DriverManager
-        .getConnection("jdbc:iotdb://localhost:6667/", "root", "root")) {
-      Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
-      statement = connection.createStatement();
-
-      setStorageGroup(statement, storageGroupList);
-      logger.debug("Finish set storage group.");
-      createTimeseries(statement, timeseriesMap);
-      logger.debug("Finish create timeseries.");
-      while (true) {
-        randomInsertData(statement, timeseriesMap);
-      }
-
-    } catch (Exception e) {
-      logger.error("", e);
-    } finally {
-      if (statement != null) {
-        statement.close();
-      }
-    }
-  }
-}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java b/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java
deleted file mode 100644
index 5cfd935..0000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.test;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SyncTestClient3 is used to generate data of another half timeseries (simulating jilian scene) which is
- * different to those in SyncTestClient2 to test stability of sync function.
- */
-public class SyncTestClient3 {
-
-  private static final int TIME_INTERVAL = 0;
-  private static final int TOTAL_DATA = 2000000;
-  private static final int ABNORMAL_MAX_INT = 0;
-  private static final int ABNORMAL_MIN_INT = -10;
-  private static final int ABNORMAL_MAX_FLOAT = 0;
-  private static final int ABNORMAL_MIN_FLOAT = -10;
-  private static final int ABNORMAL_FREQUENCY = Integer.MAX_VALUE;
-  private static final int ABNORMAL_LENGTH = 0;
-  private static final int MIN_INT = 0;
-  private static final int MAX_INT = 14;
-  private static final int MIN_FLOAT = 20;
-  private static final int MAX_FLOAT = 30;
-  private static final int STRING_LENGTH = 5;
-  private static final int BATCH_SQL = 10000;
-  private static final Logger logger = LoggerFactory.getLogger(SyncTestClient3.class);
-
-  /**
-   * generate time series map from file.
-   *
-   * @param inputFilePath input file path
-   * @return map
-   * @throws Exception Exception
-   */
-  public static Map<String, String> generateTimeseriesMapFromFile(String inputFilePath)
-      throws IOException {
-
-    Map<String, String> timeseriesMap = new HashMap<>();
-
-    File file = new File(inputFilePath);
-    try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
-      String line;
-      while ((line = reader.readLine()) != null) {
-        String timeseries = line.split(" ")[2];
-        String dataType = line.split("DATATYPE = ")[1].split(",")[0].trim();
-        String encodingType = line.split("ENCODING = ")[1].split(";")[0].trim();
-        timeseriesMap.put(timeseries, dataType + "," + encodingType);
-      }
-    }
-
-    return timeseriesMap;
-
-  }
-
-  /**
-   * create time series.
-   *
-   * @param statement statement
-   * @param timeseriesMap time series map
-   * @throws SQLException SQLException
-   */
-  public static void createTimeseries(Statement statement, Statement statement1,
-      Map<String, String> timeseriesMap)
-      throws SQLException {
-
-    try {
-      String createTimeseriesSql = "CREATE TIMESERIES <timeseries> WITH DATATYPE=<datatype>, "
-          + "ENCODING=<encode>";
-
-      int sqlCount = 0;
-
-      for (Map.Entry<String, String> entry : timeseriesMap.entrySet()) {
-        String key = entry.getKey();
-        String properties = entry.getValue();
-        String sql = createTimeseriesSql.replace("<timeseries>", key)
-            .replace("<datatype>", Utils.getType(properties))
-            .replace("<encode>", Utils.getEncode(properties));
-
-        statement.addBatch(sql);
-        statement1.addBatch(sql);
-        sqlCount++;
-        if (sqlCount >= BATCH_SQL) {
-          statement.executeBatch();
-          statement.clearBatch();
-          statement1.executeBatch();
-          statement1.clearBatch();
-          sqlCount = 0;
-        }
-      }
-      statement.executeBatch();
-      statement.clearBatch();
-      statement1.executeBatch();
-      statement1.clearBatch();
-    } catch (Exception e) {
-      logger.error("", e);
-    }
-  }
-
-  /**
-   * set storage group.
-   *
-   * @param statement statement
-   * @param storageGroupList storage group list
-   * @throws SQLException SQLException
-   */
-  public static void setStorageGroup(Statement statement, Statement statement1,
-      List<String> storageGroupList)
-      throws SQLException {
-    try {
-      String setStorageGroupSql = "SET STORAGE GROUP TO <prefixpath>";
-      for (String str : storageGroupList) {
-        String sql = setStorageGroupSql.replace("<prefixpath>", str);
-        statement.execute(sql);
-        statement1.execute(sql);
-      }
-    } catch (Exception e) {
-      logger.error("", e);
-    }
-  }
-
-  /**
-   * randomly insert data.
-   *
-   * @param statement statement
-   * @param timeseriesMap time series map
-   * @throws Exception Exception
-   */
-  public static void randomInsertData(Statement statement, Statement statement1,
-      Map<String, String> timeseriesMap) throws InterruptedException, SQLException {
-    String insertDataSql = "INSERT INTO %s (timestamp, %s) VALUES (%s, %s)";
-    int abnormalCount = 0;
-    int abnormalFlag = 1;
-
-    int sqlCount = 0;
-
-    for (int i = 0; i < TOTAL_DATA; i++) {
-
-      long time = System.currentTimeMillis();
-
-      if (i % ABNORMAL_FREQUENCY == 250) {
-        abnormalFlag = 0;
-      }
-
-      for (Map.Entry<String, String> entry : timeseriesMap.entrySet()) {
-        String key = entry.getKey();
-        String type = Utils.getType(entry.getValue());
-        String path = Utils.getPath(key);
-        String sensor = Utils.getSensor(key);
-        String sql = "";
-
-        if (type.equals("INT32")) {
-          int value;
-          if (abnormalFlag == 0) {
-            value = RandomNum.getRandomInt(ABNORMAL_MIN_INT, ABNORMAL_MAX_INT);
-          } else {
-            value = RandomNum.getRandomInt(MIN_INT, MAX_INT);
-          }
-          sql = String.format(insertDataSql, path, sensor, time, value);
-        } else if (type.equals("FLOAT")) {
-          float value;
-          if (abnormalFlag == 0) {
-            value = RandomNum.getRandomFloat(ABNORMAL_MIN_FLOAT, ABNORMAL_MAX_FLOAT);
-          } else {
-            value = RandomNum.getRandomFloat(MIN_FLOAT, MAX_FLOAT);
-          }
-          sql = String.format(insertDataSql, path, sensor, time, value);
-        } else if (type.equals("TEXT")) {
-          String value;
-          value = RandomNum.getRandomText(STRING_LENGTH);
-          sql = String.format(insertDataSql, path, sensor, time, "\"" + value + "\"");
-        }
-
-        statement.addBatch(sql);
-        statement1.addBatch(sql);
-        sqlCount++;
-        if (sqlCount >= BATCH_SQL) {
-          statement.executeBatch();
-          statement.clearBatch();
-          statement1.executeBatch();
-          statement1.clearBatch();
-          sqlCount = 0;
-        }
-      }
-
-      if (abnormalFlag == 0) {
-        abnormalCount += 1;
-      }
-      if (abnormalCount >= ABNORMAL_LENGTH) {
-        abnormalCount = 0;
-        abnormalFlag = 1;
-      }
-    }
-    statement.executeBatch();
-    statement.clearBatch();
-    statement1.executeBatch();
-    statement1.clearBatch();
-  }
-
-  /**
-   * main function.
-   *
-   * @param args arguments
-   * @throws Exception Exception
-   */
-  public static void main(String[] args) throws Exception {
-
-    Statement statement = null;
-    Statement statement1 = null;
-
-    String path =
-        new File(System.getProperty(IoTDBConstant.IOTDB_HOME, null)).getParent() + File.separator
-            + "src"
-            + File.separator + "test" + File.separator + "resources" + File.separator
-            + "CreateTimeseries3.txt";
-    Map<String, String> timeseriesMap = generateTimeseriesMapFromFile(path);
-
-    List<String> storageGroupList = new ArrayList<>();
-    storageGroupList.add("root.vehicle_history2");
-    storageGroupList.add("root.vehicle_alarm2");
-    storageGroupList.add("root.vehicle_temp2");
-    storageGroupList.add("root.range_event2");
-
-    try (Connection connection1 = DriverManager
-        .getConnection("jdbc:iotdb://192.168.130.17:6667/", "root", "root")) {
-      Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
-      try (Connection connection = DriverManager
-          .getConnection("jdbc:iotdb://localhost:6667/", "root", "root")) {
-        statement = connection.createStatement();
-
-        statement1 = connection1.createStatement();
-
-        setStorageGroup(statement, statement1, storageGroupList);
-        logger.debug("Finish set storage group.");
-        createTimeseries(statement, statement1, timeseriesMap);
-        logger.debug("Finish create timeseries.");
-        while (true) {
-          randomInsertData(statement, statement1, timeseriesMap);
-        }
-      }
-    } catch (Exception e) {
-      logger.error("", e);
-    } finally {
-      if (statement != null) {
-        statement.close();
-      }
-      if (statement1 != null) {
-        statement1.close();
-      }
-    }
-  }
-}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/test/Utils.java b/server/src/test/java/org/apache/iotdb/db/sync/test/Utils.java
deleted file mode 100644
index bd24a10..0000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/test/Utils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.test;
-
-public class Utils {
-
-  public static String getType(String properties) {
-    return properties.split(",")[0];
-  }
-
-  public static String getEncode(String properties) {
-    return properties.split(",")[1];
-  }
-
-  private Utils() {
-    throw new IllegalStateException("Utility class");
-  }
-
-  public static String getPath(String timeseries) {
-    int lastPointIndex = timeseries.lastIndexOf('.');
-    return timeseries.substring(0, lastPointIndex);
-  }
-
-  public static String getSensor(String timeseries) {
-    int lastPointIndex = timeseries.lastIndexOf('.');
-    return timeseries.substring(lastPointIndex + 1);
-  }
-}