You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/30 02:43:43 UTC
[incubator-iotdb] 02/03: complete load module in receiver end
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 35f06a88c21e2fa99d03d44c4e1633197fcad5b4
Author: lta <li...@163.com>
AuthorDate: Thu Aug 29 21:00:57 2019 +0800
complete load module in receiver end
---
.../iotdb/db/sync/receiver/SyncServerManager.java | 3 +
.../iotdb/db/sync/receiver/load/FileLoader.java | 105 +++++++++++++++++++--
.../db/sync/receiver/load/FileLoaderManager.java | 93 ++++++++++++++++++
.../iotdb/db/sync/receiver/load/IFileLoader.java | 6 ++
.../sync/receiver/recover/SyncReceiverLogger.java | 18 ++++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 18 ++--
.../apache/iotdb/db/sync/sender/conf/Constans.java | 2 +
.../db/sync/sender/conf/SyncSenderConfig.java | 7 ++
.../sync/sender/transfer/DataTransferManager.java | 38 +++++++-
service-rpc/src/main/thrift/sync.thrift | 2 +-
10 files changed, 275 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 1d37539..faba821 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
@@ -65,6 +66,7 @@ public class SyncServerManager implements IService {
if (!conf.isSyncEnable()) {
return;
}
+ FileLoaderManager.getInstance().start();
if (conf.getIpWhiteList() == null) {
logger.error(
"Sync server failed to start because IP white list is null, please set IP white list.");
@@ -83,6 +85,7 @@ public class SyncServerManager implements IService {
@Override
public void stop() {
if (conf.isSyncEnable()) {
+ FileLoaderManager.getInstance().stop();
((SyncServiceThread) syncServerThread).close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index c09b0ec..7681433 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -1,33 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
+import java.util.ArrayDeque;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FileLoader implements IFileLoader {
- private FileLoader(){
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileLoader.class);
+ private static final int WAIT_TIME = 1000;
+
+ private File syncEndFile;
+
+ private String senderName;
+
+ private ArrayDeque<LoadTask> queue = new ArrayDeque<>();
+
+ private volatile boolean endSync = false;
+
+ private FileLoader(String senderName, String syncFolderPath) {
+ this.senderName = senderName;
+ this.syncEndFile = new File(syncFolderPath, Constans.SYNC_END);
+ FileLoaderManager.getInstance().addFileLoader(senderName, this);
+ FileLoaderManager.getInstance().addLoadTaskRunner(loadTaskRunner);
}
- public static FileLoader getInstance(){
- return FileLoaderHolder.INSTANCE;
+ public static FileLoader createFileLoader(String senderName, String syncFolderPath) {
+ return new FileLoader(senderName, syncFolderPath);
}
+ private Runnable loadTaskRunner = () -> {
+ try {
+ while (true) {
+ if (endSync) {
+ cleanUp();
+ break;
+ }
+ if (queue.isEmpty()) {
+ synchronized (queue) {
+ if (queue.isEmpty()) {
+ queue.wait(WAIT_TIME);
+ }
+ }
+ }
+ if (!queue.isEmpty()) {
+ handleLoadTask(queue.poll());
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("Close the run load task thread.");
+ }
+ };
+
@Override
public void addDeletedFileName(String sgName, File deletedFile) {
-
+ synchronized (queue) {
+ queue.add(new LoadTask(sgName, deletedFile, LoadType.DELETE));
+ queue.notify();
+ }
}
@Override
public void addTsfile(String sgName, File tsfile) {
+ synchronized (queue) {
+ queue.add(new LoadTask(sgName, tsfile, LoadType.ADD));
+ queue.notify();
+ }
+ }
+
+ @Override
+ public void endSync() {
+ this.endSync = true;
+ }
+
+ @Override
+ public void handleLoadTask(LoadTask task) {
}
+ @Override
+ public void cleanUp() {
+ FileLoaderManager.getInstance().removeFileLoader(senderName);
+ }
+
+ class LoadTask {
- private static class FileLoaderHolder{
- private static final FileLoader INSTANCE = new FileLoader();
+ String sgName;
+ File file;
+ LoadType type;
- private static FileLoader getInstance(){
- return INSTANCE;
+ LoadTask(String sgName, File file, LoadType type) {
+ this.sgName = sgName;
+ this.file = file;
+ this.type = type;
}
}
+
+ private enum LoadType {
+ DELETE, ADD;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
new file mode 100644
index 0000000..2b86777
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.load;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileLoaderManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileLoaderManager.class);
+
+ private static final int WAIT_TIMEOUT = 2000;
+
+ private ConcurrentHashMap<String, FileLoader> fileLoaderMap;
+
+ private ExecutorService loadTaskRunnerPool;
+
+
+ private FileLoaderManager() {
+ }
+
+ public static FileLoaderManager getInstance() {
+ return FileLoaderManagerHolder.INSTANCE;
+ }
+
+ public void addFileLoader(String senderName, FileLoader fileLoader){
+ fileLoaderMap.put(senderName, fileLoader);
+ }
+
+ public void removeFileLoader(String senderName){
+ fileLoaderMap.remove(senderName);
+ }
+
+ public FileLoader getFileLoader(String senderName) {
+ return fileLoaderMap.get(senderName);
+ }
+
+ public void addLoadTaskRunner(Runnable taskRunner){
+ loadTaskRunnerPool.submit(taskRunner);
+ }
+
+ public void start() {
+ if (fileLoaderMap == null) {
+ fileLoaderMap = new ConcurrentHashMap<>();
+ }
+ if (loadTaskRunnerPool == null) {
+ loadTaskRunnerPool = Executors.newCachedThreadPool();
+ }
+ }
+
+ public void stop() {
+ fileLoaderMap = null;
+ loadTaskRunnerPool.shutdownNow();
+ int totalWaitTime = WAIT_TIMEOUT;
+ while (!loadTaskRunnerPool.isTerminated()) {
+ try {
+ if (!loadTaskRunnerPool.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ LOGGER.info("File load manager thread pool doesn't exit after {}ms.",
+ +totalWaitTime);
+ }
+ totalWaitTime += WAIT_TIMEOUT;
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while waiting file load manager thread pool to exit. ", e);
+ }
+ }
+ loadTaskRunnerPool = null;
+ }
+
+ private static class FileLoaderManagerHolder {
+
+ private static final FileLoaderManager INSTANCE = new FileLoaderManager();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
index c2d55b0..98aeca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader.LoadTask;
public interface IFileLoader {
@@ -26,4 +27,9 @@ public interface IFileLoader {
void addTsfile(String sgName, File tsfile);
+ void endSync();
+
+ void handleLoadTask(LoadTask task);
+
+ void cleanUp();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
index 9a8cc96..b43cfa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.sync.receiver.recover;
import java.io.BufferedWriter;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7156ef7..e2f45a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.MetadataOperationType;
import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.apache.iotdb.db.utils.FilePathUtils;
@@ -71,7 +72,7 @@ public class SyncServiceImpl implements SyncService.Iface {
private ThreadLocal<SyncReceiverLogger> syncLog = new ThreadLocal<>();
- private ThreadLocal<String> senderIp = new ThreadLocal<>();
+ private ThreadLocal<String> senderName = new ThreadLocal<>();
private ThreadLocal<File> currentFile = new ThreadLocal<>();
@@ -83,10 +84,10 @@ public class SyncServiceImpl implements SyncService.Iface {
* Verify IP address of sender
*/
@Override
- public ResultStatus check(String ipAddress) {
+ public ResultStatus check(String ipAddress, String uuid) {
Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
- senderIp.set(ipAddress);
+ senderName.set(ipAddress + "-" + uuid);
if (checkRecovery()) {
return getSuccessResult();
} else {
@@ -105,7 +106,7 @@ public class SyncServiceImpl implements SyncService.Iface {
}
return true;
} catch (IOException e) {
- e.printStackTrace();
+ logger.error("Check recovery state fail", e);
return false;
}
}
@@ -115,6 +116,7 @@ public class SyncServiceImpl implements SyncService.Iface {
try {
initPath();
currentSG.remove();
+ FileLoader.createFileLoader(senderName.get(), syncFolderPath.get());
syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), Constans.SYNC_LOG_NAME)));
return getSuccessResult();
} catch (DiskSpaceInsufficientException | IOException e) {
@@ -130,7 +132,7 @@ public class SyncServiceImpl implements SyncService.Iface {
String dataDir = DirectoryManager.getInstance().getNextFolderForSequenceFile();
syncFolderPath
.set(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER + File.separatorChar
- + senderIp.get());
+ + senderName.get());
}
/**
@@ -153,7 +155,8 @@ public class SyncServiceImpl implements SyncService.Iface {
public ResultStatus syncDeletedFileName(String fileName) throws TException {
try {
syncLog.get().finishSyncDeletedFileName(new File(fileName));
- FileLoader.getInstance().addDeletedFileName(currentSG.get(), new File(fileName));
+ FileLoaderManager.getInstance().getFileLoader(senderName.get())
+ .addDeletedFileName(currentSG.get(), new File(fileName));
} catch (IOException e) {
logger.error("Can not sync deleted file", e);
return getErrorResult(
@@ -217,7 +220,8 @@ public class SyncServiceImpl implements SyncService.Iface {
} else {
if (!currentFile.get().getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
syncLog.get().finishSyncTsfile(currentFile.get());
- FileLoader.getInstance().addTsfile(currentSG.get(), currentFile.get());
+ FileLoaderManager.getInstance().getFileLoader(senderName.get())
+ .addTsfile(currentSG.get(), currentFile.get());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 576b416..0825774 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -40,6 +40,8 @@ public class Constans {
public static final String LOCK_FILE_NAME = "sync_lock";
+ public static final String UUID_FILE_NAME = "uuid.txt";
+
public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index d6028ca..4a0ae8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -32,6 +32,8 @@ public class SyncSenderConfig {
private String lockFilePath;
+ private String uuidPath;
+
private String lastFileInfo;
private String snapshotPath;
@@ -43,6 +45,7 @@ public class SyncSenderConfig {
senderFolderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar +
getSyncReceiverName();
lockFilePath = senderFolderPath + File.separatorChar + Constans.LOCK_FILE_NAME;
+ uuidPath = senderFolderPath + File.separatorChar + Constans.UUID_FILE_NAME;
lastFileInfo = senderFolderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
snapshotPath = senderFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
if(!new File(snapshotPath).exists()){
@@ -106,6 +109,10 @@ public class SyncSenderConfig {
this.snapshotPath = snapshotPath;
}
+ public String getUuidPath() {
+ return uuidPath;
+ }
+
public String getSyncReceiverName() {
return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index ce67c27..ec75c0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -19,6 +19,7 @@ import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
@@ -35,6 +36,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
@@ -275,7 +277,8 @@ public class DataTransferManager implements IDataTransferManager {
@Override
public void confirmIdentity() throws SyncConnectionException {
try {
- ResultStatus status = serviceClient.check(InetAddress.getLocalHost().getHostAddress());
+ ResultStatus status = serviceClient.check(InetAddress.getLocalHost().getHostAddress(),
+ getOrCreateUUID(config.getUuidPath()));
if (!status.success) {
throw new SyncConnectionException(
"The receiver rejected the synchronization task because " + status.errorMsg);
@@ -286,6 +289,39 @@ public class DataTransferManager implements IDataTransferManager {
}
}
+ /**
+ * UUID marks the identity of sender for receiver.
+ */
+ public String getOrCreateUUID(String uuidPath) throws IOException {
+ File file = new File(uuidPath);
+ String uuid;
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdirs();
+ }
+ if (!file.exists()) {
+ try (FileOutputStream out = new FileOutputStream(file)) {
+ file.createNewFile();
+ uuid = generateUUID();
+ out.write(uuid.getBytes());
+ } catch (IOException e) {
+ logger.error("Cannot insert UUID to file {}", file.getPath());
+ throw new IOException(e);
+ }
+ } else {
+ try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
+ uuid = bf.readLine();
+ } catch (IOException e) {
+ logger.error("Cannot read UUID from file{}", file.getPath());
+ throw new IOException(e);
+ }
+ }
+ return uuid;
+ }
+
+ private String generateUUID() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
@Override
public void syncSchema() throws SyncConnectionException, TException {
int retryCount = 0;
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index 2064c4e..17bc241 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -29,7 +29,7 @@ struct ResultStatus{
}
service SyncService{
- ResultStatus check(1:string address)
+ ResultStatus check(1:string address, 2:string uuid)
ResultStatus startSync();
ResultStatus init(1:string storageGroupName)
ResultStatus syncDeletedFileName(1:string fileName)