You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/12/22 16:31:06 UTC
[iotdb] 01/01: fix sync schema bug
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch fix_sync_schema_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 128e1914d6d259db1c5504688ba84c05c07ddcde
Author: lta <li...@163.com>
AuthorDate: Wed Dec 23 00:30:16 2020 +0800
fix sync schema bug
---
.../db/sync/receiver/transfer/SyncServiceImpl.java | 5 ++-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 45 ++++++----------------
2 files changed, 14 insertions(+), 36 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7d8df48..3572bac 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -270,10 +270,11 @@ public class SyncServiceImpl implements SyncService.Iface {
private void loadMetadata() {
logger.info("Start to load metadata in sync process.");
if (currentFile.get().exists()) {
- try (MLogReader mLogReader = new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG)) {
+ try (MLogReader mLogReader = new MLogReader(currentFile.get())) {
while (mLogReader.hasNext()) {
- PhysicalPlan plan = mLogReader.next();
+ PhysicalPlan plan = null;
try {
+ plan = mLogReader.next();
if (plan == null) {
continue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 838d74e..5e347f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -72,7 +72,6 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -89,8 +88,6 @@ public class SyncClient implements ISyncClient {
private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
- private static final int BATCH_LINE = 1000;
-
private static final int TIMEOUT_MS = 1000;
/**
@@ -99,7 +96,7 @@ public class SyncClient implements ISyncClient {
* location is recorded once after each synchronization task for the next synchronization task to
* use.
*/
- private int schemaFileLinePos;
+ private int schemaFilePos;
private TTransport transport;
@@ -365,45 +362,25 @@ public class SyncClient implements ISyncClient {
}
private boolean tryToSyncSchema() {
- int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+ schemaFilePos = readSyncSchemaPos(getSchemaPosFile());
// start to sync file data and get digest of this file.
- try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+ try (FileInputStream fis = new FileInputStream(getSchemaLogFile());
ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
- schemaFileLinePos = 0;
- String line;
- while (schemaFileLinePos < schemaPos) {
- line = br.readLine();
- schemaFileLinePos++;
- }
+ fis.skip(schemaFilePos);
MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME);
- int cntLine = 0;
- while ((line = br.readLine()) != null) {
- schemaFileLinePos++;
- byte[] singleLineData = BytesUtils.stringToBytes(line);
- bos.write(singleLineData);
- bos.write("\r\n".getBytes());
- if (cntLine++ == BATCH_LINE) {
- md.update(bos.toByteArray());
- ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
- bos.reset();
- SyncStatus status = serviceClient.syncData(buffToSend);
- if (status.code != SUCCESS_CODE) {
- logger.error("Receiver failed to receive metadata because {}, retry.", status.msg);
- return false;
- }
- cntLine = 0;
- }
- }
- if (bos.size() != 0) {
- md.update(bos.toByteArray());
+ byte[] buffer = new byte[SyncConstant.DATA_CHUNK_SIZE];
+ int dataLength;
+ while ((dataLength = fis.read(buffer)) != -1) {
+ bos.write(buffer, 0, dataLength);
+ md.update(buffer, 0, dataLength);
ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
- bos.reset();
SyncStatus status = serviceClient.syncData(buffToSend);
if (status.code != SUCCESS_CODE) {
logger.error("Receiver failed to receive metadata because {}, retry.", status.msg);
return false;
}
+ schemaFilePos += dataLength;
}
// check digest
@@ -453,7 +430,7 @@ public class SyncClient implements ISyncClient {
syncSchemaLogFile.createNewFile();
}
try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
- br.write(Integer.toString(schemaFileLinePos));
+ br.write(Integer.toString(schemaFilePos));
}
} catch (IOException e) {
logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);