You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/04 03:27:27 UTC
[incubator-iotdb] 02/03: fix some acute bug in testing
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d287ba13fefa56a1909e96362def3fbb52bf8329
Author: lta <li...@163.com>
AuthorDate: Wed Sep 4 11:14:37 2019 +0800
fix some acute bug in testing
---
.../receiver/recover/SyncReceiverLogAnalyzer.java | 2 ++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 25 ++++++++++------
.../db/sync/sender/conf/SyncSenderConfig.java | 5 ++--
.../sync/sender/transfer/DataTransferManager.java | 35 +++++++++++++---------
4 files changed, 42 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
index 375c8a1..b4899ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.sync.receiver.load.FileLoader;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
@@ -65,6 +66,7 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer {
// check the state
if (!new File(senderFolder, SyncConstant.SYNC_LOG_NAME).exists()) {
new File(senderFolder, SyncConstant.LOAD_LOG_NAME).delete();
+ FileUtils.deleteDirectory(new File(senderFolder, SyncConstant.RECEIVER_DATA_FOLDER_NAME));
return true;
}
if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 8a6064a..a83a245 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -27,10 +27,10 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -103,7 +103,9 @@ public class SyncServiceImpl implements SyncService.Iface {
if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
currentFileWriter.get().close();
}
- syncLog.get().close();
+ if (syncLog.get() != null) {
+ syncLog.get().close();
+ }
return SyncReceiverLogAnalyzer.getInstance().recover(senderName.get());
} catch (IOException e) {
logger.error("Check recovery state fail", e);
@@ -117,7 +119,7 @@ public class SyncServiceImpl implements SyncService.Iface {
initPath();
currentSG.remove();
FileLoader.createFileLoader(senderName.get(), syncFolderPath.get());
- syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), SyncConstant.SYNC_LOG_NAME)));
+ syncLog.set(new SyncReceiverLogger(new File(syncFolderPath.get(), SyncConstant.SYNC_LOG_NAME)));
return getSuccessResult();
} catch (DiskSpaceInsufficientException | IOException e) {
logger.error("Can not receiver data from sender", e);
@@ -175,6 +177,7 @@ public class SyncServiceImpl implements SyncService.Iface {
} else {
file = new File(getSyncDataPath(), currentSG.get() + File.separatorChar + filename);
}
+ file.delete();
currentFile.set(file);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
@@ -184,7 +187,8 @@ public class SyncServiceImpl implements SyncService.Iface {
}
currentFileWriter.set(new FileOutputStream(file).getChannel());
syncLog.get().startSyncTsFiles();
- } catch (IOException e) {
+ messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME));
+ } catch (IOException | NoSuchAlgorithmException e) {
logger.error("Can not init sync resource for file {}", filename, e);
return getErrorResult(
String.format("Can not init sync resource for file %s because %s", filename,
@@ -197,6 +201,7 @@ public class SyncServiceImpl implements SyncService.Iface {
public ResultStatus syncData(ByteBuffer buff) {
try {
currentFileWriter.get().write(buff);
+ buff.flip();
messageDigest.get().update(buff);
} catch (IOException e) {
logger.error("Can not sync data for file {}", currentFile.get().getAbsoluteFile(), e);
@@ -213,7 +218,7 @@ public class SyncServiceImpl implements SyncService.Iface {
try {
currentFileWriter.get().close();
if (!md5OfSender.equals(md5OfReceiver)) {
- FileUtils.forceDelete(currentFile.get());
+ currentFile.get().delete();
currentFileWriter.set(new FileOutputStream(currentFile.get()).getChannel());
} else {
if (currentFile.get().getName().endsWith(MetadataConstant.METADATA_LOG)) {
@@ -241,13 +246,15 @@ public class SyncServiceImpl implements SyncService.Iface {
new java.io.FileReader(currentFile.get()))) {
String metadataOperation;
while ((metadataOperation = br.readLine()) != null) {
- operation(metadataOperation);
+ try {
+ operation(metadataOperation);
+ } catch (IOException | MetadataErrorException | PathErrorException e) {
+ // multiple insert schema, ignore it.
+ }
}
- } catch (FileNotFoundException e) {
+ } catch (IOException e) {
logger.error("Cannot read the file {}.", currentFile.get().getAbsoluteFile(), e);
return false;
- } catch (IOException | MetadataErrorException | PathErrorException e) {
- // multiple insert schema, ignore it.
}
}
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index cc6f11c..3216558 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.sync.sender.conf;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
public class SyncSenderConfig {
@@ -27,7 +28,7 @@ public class SyncSenderConfig {
private int serverPort = 5555;
- private int syncPeriodInSecond = 10;
+ private int syncPeriodInSecond = 600;
private String senderFolderPath;
@@ -42,7 +43,7 @@ public class SyncSenderConfig {
/**
* Storage groups which participate in sync process
*/
- private List<String> storageGroupList;
+ private List<String> storageGroupList = new ArrayList<>();
/**
* Update paths based on data directory
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 0952fe5..3d550a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -132,16 +132,20 @@ public class DataTransferManager implements IDataTransferManager {
* running.
*/
private void verifySingleton() throws IOException {
- File lockFile = new File(config.getLockFilePath());
- if (!lockFile.getParentFile().exists()) {
- lockFile.getParentFile().mkdirs();
- }
- if (!lockFile.exists()) {
- lockFile.createNewFile();
- }
- if (!lockInstance(config.getLockFilePath())) {
- logger.error("Sync client is already running.");
- System.exit(1);
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ for (String dataDir : dataDirs) {
+ config.update(dataDir);
+ File lockFile = new File(config.getLockFilePath());
+ if (!lockFile.getParentFile().exists()) {
+ lockFile.getParentFile().mkdirs();
+ }
+ if (!lockFile.exists()) {
+ lockFile.createNewFile();
+ }
+ if (!lockInstance(config.getLockFilePath())) {
+ logger.error("Sync client is already running.");
+ System.exit(1);
+ }
}
}
@@ -343,8 +347,9 @@ public class DataTransferManager implements IDataTransferManager {
try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
schemaFileLinePos = 0;
- while (schemaFileLinePos++ <= schemaPos) {
+ while (schemaFileLinePos < schemaPos) {
br.readLine();
+ schemaFileLinePos++;
}
MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME);
String line;
@@ -353,8 +358,9 @@ public class DataTransferManager implements IDataTransferManager {
schemaFileLinePos++;
byte[] singleLineData = BytesUtils.stringToBytes(line);
bos.write(singleLineData);
- md.update(singleLineData);
+ bos.write("\r\n".getBytes());
if (cntLine++ == BATCH_LINE) {
+ md.update(bos.toByteArray());
ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
bos.reset();
ResultStatus status = serviceClient.syncData(buffToSend);
@@ -366,6 +372,7 @@ public class DataTransferManager implements IDataTransferManager {
}
}
if (bos.size() != 0) {
+ md.update(bos.toByteArray());
ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
bos.reset();
ResultStatus status = serviceClient.syncData(buffToSend);
@@ -376,7 +383,7 @@ public class DataTransferManager implements IDataTransferManager {
}
// check md5
- return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+ return checkMD5ForSchema(new BigInteger(1, md.digest()).toString(16));
} catch (NoSuchAlgorithmException | IOException | TException e) {
logger.error("Can not finish transfer schema to receiver", e);
return false;
@@ -497,7 +504,7 @@ public class DataTransferManager implements IDataTransferManager {
try {
File snapshotFile = makeFileSnapshot(tsfile);
// firstly sync .restore file, then sync tsfile
- syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX));
+ syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
syncSingleFile(snapshotFile);
lastLocalFilesMap.get(sgName).add(tsfile);
syncLog.finishSyncTsfile(tsfile);