You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/30 12:38:01 UTC
[incubator-iotdb] 02/03: add sync log recovery in receiver end
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8b5123486fb2ac84ce230134c144e066c734c006
Author: lta <li...@163.com>
AuthorDate: Fri Aug 30 16:20:22 2019 +0800
add sync log recovery in receiver end
---
.../iotdb/db/sync/receiver/SyncServerManager.java | 7 +
.../iotdb/db/sync/receiver/load/FileLoader.java | 26 ++--
.../db/sync/receiver/load/FileLoaderManager.java | 4 +
.../iotdb/db/sync/receiver/load/ILoadLogger.java | 18 +++
.../iotdb/db/sync/receiver/load/LoadLogger.java | 23 +++-
.../LoadType.java} | 11 +-
.../receiver/recover/ISyncReceiverLogAnalyzer.java | 11 +-
.../receiver/recover/SyncReceiverLogAnalyzer.java | 152 +++++++++++++++++++++
.../sync/receiver/recover/SyncReceiverLogger.java | 5 +-
.../db/sync/receiver/transfer/SyncServiceImpl.java | 23 ++--
.../apache/iotdb/db/sync/sender/conf/Constans.java | 2 -
11 files changed, 245 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index faba821..c6e06cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.sync.receiver;
+import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
+import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
@@ -67,6 +69,11 @@ public class SyncServerManager implements IService {
return;
}
FileLoaderManager.getInstance().start();
+ try {
+ SyncReceiverLogAnalyzer.getInstance().recoverAll();
+ } catch (IOException e) {
+ logger.error("Can not recover receiver sync state", e);
+ }
if (conf.getIpWhiteList() == null) {
logger.error(
"Sync server failed to start because IP white list is null, please set IP white list.");
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 8db09c0..d834487 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -56,14 +56,19 @@ public class FileLoader implements IFileLoader {
return new FileLoader(senderName, syncFolderPath);
}
+ public static FileLoader createFileLoader(File syncFolder)
+ throws IOException {
+ return new FileLoader(syncFolder.getName(), syncFolder.getAbsolutePath());
+ }
+
private Runnable loadTaskRunner = () -> {
try {
while (true) {
- if (endSync) {
- cleanUp();
- break;
- }
if (queue.isEmpty()) {
+ if (endSync) {
+ cleanUp();
+ break;
+ }
synchronized (queue) {
if (queue.isEmpty()) {
queue.wait(WAIT_TIME);
@@ -97,7 +102,9 @@ public class FileLoader implements IFileLoader {
@Override
public void endSync() {
- this.endSync = true;
+ if (!endSync && FileLoaderManager.getInstance().containsFileLoader(senderName)) {
+ this.endSync = true;
+ }
}
@Override
@@ -139,13 +146,16 @@ public class FileLoader implements IFileLoader {
loadLog.close();
new File(syncFolderPath, Constans.SYNC_LOG_NAME).deleteOnExit();
new File(syncFolderPath, Constans.LOAD_LOG_NAME).deleteOnExit();
- new File(syncFolderPath, Constans.SYNC_END).deleteOnExit();
FileLoaderManager.getInstance().removeFileLoader(senderName);
} catch (IOException e) {
LOGGER.error("Can not clean up sync resource.", e);
}
}
+ public void setCurType(LoadType curType) {
+ this.curType = curType;
+ }
+
class LoadTask {
private File file;
@@ -156,8 +166,4 @@ public class FileLoader implements IFileLoader {
this.type = type;
}
}
-
- private enum LoadType {
- DELETE, ADD, NONE
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
index 2b86777..8fc341a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
@@ -55,6 +55,10 @@ public class FileLoaderManager {
return fileLoaderMap.get(senderName);
}
+ public boolean containsFileLoader(String senderName){
+ return fileLoaderMap.containsKey(senderName);
+ }
+
public void addLoadTaskRunner(Runnable taskRunner){
loadTaskRunnerPool.submit(taskRunner);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
index 2cad379..15fd8f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
index 31b4c48..881a637 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.sync.receiver.load;
import java.io.BufferedWriter;
@@ -45,6 +63,9 @@ public class LoadLogger implements ILoadLogger {
@Override
public void close() throws IOException {
- bw.close();
+ if(bw != null) {
+ bw.close();
+ bw = null;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadType.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
copy to server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadType.java
index 8015fb2..4742093 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadType.java
@@ -16,13 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.sync.receiver.recover;
+package org.apache.iotdb.db.sync.receiver.load;
-public interface ISyncReceiverLogAnalyzer {
-
- void recover();
-
- void scanLogger(String path);
-
- void clearLogger();
+public enum LoadType {
+ DELETE, ADD, NONE
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
index 8015fb2..a470de3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
@@ -18,11 +18,16 @@
*/
package org.apache.iotdb.db.sync.receiver.recover;
+import java.io.File;
+import java.io.IOException;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+
public interface ISyncReceiverLogAnalyzer {
- void recover();
+ void recoverAll() throws IOException;
+
+ boolean recover(String senderName) throws IOException;
- void scanLogger(String path);
+ void scanLogger(FileLoader loader, File syncLog, File loadLog);
- void clearLogger();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
new file mode 100644
index 0000000..4ed7284
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.recover;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
+import org.apache.iotdb.db.sync.receiver.load.LoadLogger;
+import org.apache.iotdb.db.sync.receiver.load.LoadType;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncReceiverLogAnalyzer.class);
+
+ private static final int WAIT_TIMEOUT = 2000;
+
+ private SyncReceiverLogAnalyzer() {
+
+ }
+
+ public static SyncReceiverLogAnalyzer getInstance() {
+ return SyncReceiverLogAnalyzerHolder.INSTANCE;
+ }
+
+ @Override
+ public void recoverAll() throws IOException {
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ LOGGER.info("Start to recover all sync state for sync receiver.");
+ for (String dataDir : dataDirs) {
+ if (!new File(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER).exists()) {
+ continue;
+ }
+ for (File syncFolder : new File(
+ FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER)
+ .listFiles()) {
+ recover(syncFolder);
+ }
+ }
+ LOGGER.info("Finish to recover all sync state for sync receiver.");
+ }
+
+ private boolean recover(File senderFolder) throws IOException {
+ // check the state
+ if (!new File(senderFolder, Constans.SYNC_LOG_NAME).exists()) {
+ new File(senderFolder, Constans.LOAD_LOG_NAME).deleteOnExit();
+ return true;
+ }
+ if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) {
+ FileLoaderManager.getInstance().getFileLoader(senderFolder.getName()).endSync();
+ try {
+ Thread.sleep(WAIT_TIMEOUT); // wait for file loader to clean up resource
+ } catch (InterruptedException e) {
+ LOGGER.error("Can not wait for recovery to complete.", e);
+ }
+ } else {
+ scanLogger(FileLoader.createFileLoader(senderFolder),
+ new File(senderFolder, Constans.SYNC_LOG_NAME),
+ new File(senderFolder, Constans.LOAD_LOG_NAME));
+ }
+ return FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName());
+ }
+
+ @Override
+ public boolean recover(String senderName) throws IOException {
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ boolean recoverComplete = true;
+ for (String dataDir : dataDirs) {
+ if (!new File(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER).exists()) {
+ continue;
+ }
+ for (File syncFolder : new File(
+ FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER)
+ .listFiles()) {
+ if (syncFolder.getName().equals(senderName)) {
+ recoverComplete &= recover(syncFolder);
+ }
+ }
+ }
+ return recoverComplete;
+ }
+
+ @Override
+ public void scanLogger(FileLoader loader, File syncLog, File loadLog) {
+ LoadType loadType = LoadType.NONE;
+ try (BufferedReader syncReader = new BufferedReader(new FileReader(syncLog))) {
+ String line;
+ try (BufferedReader loadReader = new BufferedReader(new FileReader(loadLog))) {
+ while ((line = loadReader.readLine()) != null) {
+ if (line.equals(LoadLogger.LOAD_DELETED_FILE_NAME_START)) {
+ loadType = LoadType.DELETE;
+ } else if (line.equals(LoadLogger.LOAD_TSFILE_START)) {
+ loadType = LoadType.ADD;
+ } else {
+ while (!syncReader.readLine().equals(line)) {
+ }
+ }
+ }
+ }
+ loader.setCurType(loadType);
+ while ((line = syncReader.readLine()) != null) {
+ if (line.equals(SyncReceiverLogger.SYNC_DELETED_FILE_NAME_START)) {
+ loadType = LoadType.DELETE;
+ } else if (line.equals(SyncReceiverLogger.SYNC_TSFILE_START)) {
+ loadType = LoadType.ADD;
+ } else {
+ switch (loadType) {
+ case ADD:
+ loader.addTsfile(new File(line));
+ break;
+ case DELETE:
+ loader.addDeletedFileName(new File(line));
+ break;
+ default:
+ LOGGER.error("Wrong load type {}", loadType);
+ }
+ }
+ }
+ loader.endSync();
+ } catch (IOException e) {
+ LOGGER.error("Can not scan log for recovery", e);
+ }
+ }
+
+ private static class SyncReceiverLogAnalyzerHolder {
+
+ private static final SyncReceiverLogAnalyzer INSTANCE = new SyncReceiverLogAnalyzer();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
index b43cfa3..b940758 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
@@ -63,6 +63,9 @@ public class SyncReceiverLogger implements ISyncReceiverLogger {
@Override
public void close() throws IOException {
- bw.close();
+ if(bw != null) {
+ bw.close();
+ bw = null;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 525244d..8d2d61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.MetadataOperationType;
import org.apache.iotdb.db.sync.receiver.load.FileLoader;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
+import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.apache.iotdb.db.utils.FilePathUtils;
@@ -62,8 +63,6 @@ public class SyncServiceImpl implements SyncService.Iface {
private static final Logger logger = LoggerFactory.getLogger(SyncServiceImpl.class);
- private static final MManager METADATA_MANGER = MManager.getInstance();
-
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private ThreadLocal<String> syncFolderPath = new ThreadLocal<>();
@@ -104,7 +103,8 @@ public class SyncServiceImpl implements SyncService.Iface {
if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
currentFileWriter.get().close();
}
- return true;
+ syncLog.get().close();
+ return SyncReceiverLogAnalyzer.getInstance().recover(senderName.get());
} catch (IOException e) {
logger.error("Check recovery state fail", e);
return false;
@@ -269,32 +269,32 @@ public class SyncServiceImpl implements SyncService.Iface {
kv = args[k].split("=");
props.put(kv[0], kv[1]);
}
- METADATA_MANGER
+ MManager.getInstance()
.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
TSEncoding.deserialize(Short.valueOf(args[3])),
CompressionType.deserialize(Short.valueOf(args[4])),
props);
break;
case MetadataOperationType.DELETE_PATH_FROM_MTREE:
- METADATA_MANGER.deletePaths(Collections.singletonList(new Path(args[1])));
+ MManager.getInstance().deletePaths(Collections.singletonList(new Path(args[1])));
break;
case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
- METADATA_MANGER.setStorageLevelToMTree(args[1]);
+ MManager.getInstance().setStorageLevelToMTree(args[1]);
break;
case MetadataOperationType.ADD_A_PTREE:
- METADATA_MANGER.addAPTree(args[1]);
+ MManager.getInstance().addAPTree(args[1]);
break;
case MetadataOperationType.ADD_A_PATH_TO_PTREE:
- METADATA_MANGER.addPathToPTree(args[1]);
+ MManager.getInstance().addPathToPTree(args[1]);
break;
case MetadataOperationType.DELETE_PATH_FROM_PTREE:
- METADATA_MANGER.deletePathFromPTree(args[1]);
+ MManager.getInstance().deletePathFromPTree(args[1]);
break;
case MetadataOperationType.LINK_MNODE_TO_PTREE:
- METADATA_MANGER.linkMNodeToPTree(args[1], args[2]);
+ MManager.getInstance().linkMNodeToPTree(args[1], args[2]);
break;
case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
- METADATA_MANGER.unlinkMNodeFromPTree(args[1], args[2]);
+ MManager.getInstance().unlinkMNodeFromPTree(args[1], args[2]);
break;
default:
logger.error("Unrecognizable command {}", cmd);
@@ -305,7 +305,6 @@ public class SyncServiceImpl implements SyncService.Iface {
public ResultStatus endSync() throws TException {
try {
syncLog.get().close();
- new File(getSyncDataPath(), Constans.SYNC_END).createNewFile();
FileLoaderManager.getInstance().getFileLoader(senderName.get()).endSync();
} catch (IOException e) {
logger.error("Can not end sync", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index d3f8ff1..d711aa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -71,8 +71,6 @@ public class Constans {
public static final String RECEIVER_DATA_FOLDER_NAME = "data";
- public static final String SYNC_END = "sync.end";
-
public static final String LOAD_LOG_NAME = "load.log";
}