You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 11:23:20 UTC
[iotdb] branch xingtanzjr/query_log updated: make log in DataBlockManager more accurate
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_log by this push:
new 2b2d4223d4 make log in DataBlockManager more accurate
2b2d4223d4 is described below
commit 2b2d4223d4d4dc1751a549bbe772b902ae8e1119
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 5 19:23:09 2022 +0800
make log in DataBlockManager more accurate
---
.../statemachine/SchemaRegionStateMachine.java | 5 +-
.../execution/datatransfer/DataBlockManager.java | 22 ++++----
.../db/mpp/execution/datatransfer/SinkHandle.java | 63 ++++++++++-----------
.../mpp/execution/datatransfer/SourceHandle.java | 66 ++++++++++++++--------
.../scheduler/SimpleFragInstanceDispatcher.java | 10 ++--
5 files changed, 93 insertions(+), 73 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 4fbe1c4a0a..3221d36de4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -78,7 +78,10 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
@Override
protected DataSet read(FragmentInstance fragmentInstance) {
- logger.info("SchemaRegionStateMachine[{}]: Execute read plan: FragmentInstance-{}", schemaRegion.getSchemaRegionId(), fragmentInstance.getId());
+ logger.info(
+ "SchemaRegionStateMachine[{}]: Execute read plan: FragmentInstance-{}",
+ schemaRegion.getSchemaRegionId(),
+ fragmentInstance.getId());
return QUERY_INSTANCE_MANAGER.execSchemaQueryFragmentInstance(fragmentInstance, schemaRegion);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
index c889f18f6d..750ae2bd2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
@@ -182,13 +182,12 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public void onFinished(ISourceHandle sourceHandle) {
- logger.info("Release resources of finished source handle {}", sourceHandle);
+ logger.info("{} finished and release resources", sourceHandle);
if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
|| !sourceHandles
.get(sourceHandle.getLocalFragmentInstanceId())
.containsKey(sourceHandle.getLocalPlanNodeId())) {
- logger.info(
- "Resources of finished source handle {} has already been released", sourceHandle);
+ logger.info("{} resources has already been released", sourceHandle);
} else {
sourceHandles
.get(sourceHandle.getLocalFragmentInstanceId())
@@ -202,6 +201,7 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public void onAborted(ISourceHandle sourceHandle) {
+ logger.info("{}: onAborted is invoked", sourceHandle);
onFinished(sourceHandle);
}
@@ -228,11 +228,7 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public void onFinish(ISinkHandle sinkHandle) {
- logger.info("Release resources of finished sink handle {}", sourceHandles);
- if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
- logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
- }
- sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
+ removeFromDataBlockManager(sinkHandle);
context.finished();
}
@@ -243,15 +239,21 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public void onAborted(ISinkHandle sinkHandle) {
- logger.info("Release resources of aborted sink handle {}", sourceHandles);
+ logger.info("{} onAborted is invoked", sinkHandle);
+ removeFromDataBlockManager(sinkHandle);
+ }
+
+ private void removeFromDataBlockManager(ISinkHandle sinkHandle) {
+ logger.info("{} release resources of finished sink handle", sinkHandle);
if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
- logger.info("Resources of aborted sink handle {} has already been released", sinkHandle);
+ logger.info("{} resources already been released", sinkHandle);
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
}
@Override
public void onFailure(ISinkHandle sinkHandle, Throwable t) {
+ // TODO: (xingtanzjr) should we remove the sinkHandle from DataBlockManager ?
logger.error("Sink handle {} failed due to {}", sinkHandle, t);
if (onFailureCallback != null) {
onFailureCallback.call(t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
index c06a3f1980..518544d7aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
-import java.util.StringJoiner;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -54,6 +52,7 @@ public class SinkHandle implements ISinkHandle {
private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
public static final int MAX_ATTEMPT_TIMES = 3;
+ private static final long RETRY_INTERVAL_IN_MS = 1000L;
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
@@ -155,11 +154,7 @@ public class SinkHandle implements ISinkHandle {
}
private void sendEndOfDataBlockEvent() throws Exception {
- logger.debug(
- "Send end of data block event to plan node {} of {}. {}",
- remotePlanNodeId,
- remoteFragmentInstanceId,
- Thread.currentThread().getName());
+ logger.info("{} send end of data block event", this);
int attempt = 0;
TEndOfDataBlockEvent endOfDataBlockEvent =
new TEndOfDataBlockEvent(
@@ -173,29 +168,24 @@ public class SinkHandle implements ISinkHandle {
dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
client.onEndOfDataBlockEvent(endOfDataBlockEvent);
break;
- } catch (TException e) {
+ } catch (Throwable e) {
logger.error(
- "Failed to send end of data block event to plan node {} of {} due to {}, attempt times: {}",
- remotePlanNodeId,
- remoteFragmentInstanceId,
+ "{} Failed to send end of data block event due to {}, attempt times: {}",
+ this,
e.getMessage(),
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
throw e;
}
- } catch (IOException e) {
- logger.error("can't connect to node {}", remoteEndpoint, e);
- if (attempt == MAX_ATTEMPT_TIMES) {
- throw e;
- }
+ Thread.sleep(RETRY_INTERVAL_IN_MS);
}
}
}
@Override
public synchronized void setNoMoreTsBlocks() {
- logger.info("Setting no-more-tsblocks to {}", this);
+ logger.info("{} start to set no-more-tsblocks", this);
if (aborted) {
return;
}
@@ -204,17 +194,19 @@ public class SinkHandle implements ISinkHandle {
} catch (Exception e) {
throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
}
+ logger.info("{} set noMoreTsBlocks to true", this);
noMoreTsBlocks = true;
if (isFinished()) {
+ logger.info("{} revoke onFinish() of sinkHandleListener", this);
sinkHandleListener.onFinish(this);
}
+ logger.info("{} revoke onEndOfBlocks() of sinkHandleListener", this);
sinkHandleListener.onEndOfBlocks(this);
- logger.info("No more tsblocks has been set to {}", this);
}
@Override
public synchronized void abort() {
- logger.info("Sink handle {} is being aborted.", this);
+ logger.info("{} is being aborted.", this);
sequenceIdToTsBlock.clear();
aborted = true;
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
@@ -225,7 +217,7 @@ public class SinkHandle implements ISinkHandle {
bufferRetainedSizeInBytes = 0;
}
sinkHandleListener.onAborted(this);
- logger.info("Sink handle {} is aborted", this);
+ logger.info("{} is aborted", this);
}
@Override
@@ -304,12 +296,11 @@ public class SinkHandle implements ISinkHandle {
@Override
public String toString() {
- return new StringJoiner(", ", SinkHandle.class.getSimpleName() + "[", "]")
- .add("remoteEndpoint='" + remoteEndpoint + "'")
- .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
- .add("remotePlanNodeId='" + remotePlanNodeId + "'")
- .add("localFragmentInstanceId=" + localFragmentInstanceId)
- .toString();
+ return String.format(
+ "Query[%s]-[%s-%s-SinkHandle]:",
+ localFragmentInstanceId.queryId,
+ localFragmentInstanceId.fragmentId,
+ localFragmentInstanceId.instanceId);
}
/**
@@ -333,12 +324,11 @@ public class SinkHandle implements ISinkHandle {
@Override
public void run() {
- logger.debug(
- "Send new data block event [{}, {}) to plan node {} of {}.",
+ logger.info(
+ "{} send new data block event [{}, {})",
+ SinkHandle.this,
startSequenceId,
- startSequenceId + blockSizes.size(),
- remotePlanNodeId,
- remoteFragmentInstanceId);
+ startSequenceId + blockSizes.size());
int attempt = 0;
TNewDataBlockEvent newDataBlockEvent =
new TNewDataBlockEvent(
@@ -355,15 +345,20 @@ public class SinkHandle implements ISinkHandle {
break;
} catch (Throwable e) {
logger.error(
- "Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
- remotePlanNodeId,
- remoteFragmentInstanceId,
+ "{} failed to send new data block event due to {}, attempt times: {}",
+ SinkHandle.this,
e.getMessage(),
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
sinkHandleListener.onFailure(SinkHandle.this, e);
}
+ try {
+ Thread.sleep(RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ sinkHandleListener.onFailure(SinkHandle.this, e);
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 1c8bf70880..10ed0e27b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.StringJoiner;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
@@ -52,6 +51,7 @@ public class SourceHandle implements ISourceHandle {
private static final Logger logger = LoggerFactory.getLogger(SourceHandle.class);
public static final int MAX_ATTEMPT_TIMES = 3;
+ private static final long RETRY_INTERVAL_IN_MS = 1000;
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
@@ -120,6 +120,7 @@ public class SourceHandle implements ISourceHandle {
.free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
+ logger.info("{}: no buffered TsBlock, blocked", this);
blocked = SettableFuture.create();
}
if (isFinished()) {
@@ -189,6 +190,7 @@ public class SourceHandle implements ISourceHandle {
}
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+ logger.info("{}: receive NoMoreTsBlock event. ", this);
this.lastSequenceId = lastSequenceId;
if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
blocked.set(null);
@@ -199,6 +201,11 @@ public class SourceHandle implements ISourceHandle {
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
+ logger.info(
+ "{}: receive newDataBlockEvent. [{}, {})",
+ this,
+ startSequenceId,
+ startSequenceId + dataBlockSizes.size());
for (int i = 0; i < dataBlockSizes.size(); i++) {
sequenceIdToDataBlockSize.put(i + startSequenceId, dataBlockSizes.get(i));
}
@@ -267,12 +274,12 @@ public class SourceHandle implements ISourceHandle {
@Override
public String toString() {
- return new StringJoiner(", ", SourceHandle.class.getSimpleName() + "[", "]")
- .add("remoteEndpoint='" + remoteEndpoint + "'")
- .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
- .add("localFragmentInstanceId=" + localFragmentInstanceId)
- .add("localPlanNodeId='" + localPlanNodeId + "'")
- .toString();
+ return String.format(
+ "Query[%s]-[%s-%s-SourceHandle-%s]",
+ localFragmentInstanceId.getQueryId(),
+ localFragmentInstanceId.getFragmentId(),
+ localFragmentInstanceId.getInstanceId(),
+ localPlanNodeId);
}
/** Get data blocks from an upstream fragment instance. */
@@ -300,13 +307,11 @@ public class SourceHandle implements ISourceHandle {
@Override
public void run() {
- logger.debug(
- "Get data blocks [{}, {}) from {} for plan node {} of {}.",
+ logger.info(
+ "{}: try to get data blocks [{}, {}) ",
+ SourceHandle.this,
startSequenceId,
- endSequenceId,
- remoteFragmentInstanceId,
- localPlanNodeId,
- localFragmentInstanceId);
+ endSequenceId);
TGetDataBlockRequest req =
new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
int attempt = 0;
@@ -320,6 +325,7 @@ public class SourceHandle implements ISourceHandle {
TsBlock tsBlock = serde.deserialize(byteBuffer);
tsBlocks.add(tsBlock);
}
+ logger.info("{}: got data blocks. count: {}", SourceHandle.this, tsBlocks.size());
synchronized (SourceHandle.this) {
if (aborted) {
return;
@@ -336,8 +342,8 @@ public class SourceHandle implements ISourceHandle {
break;
} catch (Throwable e) {
logger.error(
- "Failed to get data block from {} due to {}, attempt times: {}",
- remoteFragmentInstanceId,
+ "{}: failed to get data block {}, attempt times: {}",
+ SourceHandle.this,
e.getMessage(),
attempt);
if (attempt == MAX_ATTEMPT_TIMES) {
@@ -349,6 +355,14 @@ public class SourceHandle implements ISourceHandle {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
+ try {
+ Thread.sleep(RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ synchronized (SourceHandle.this) {
+ sourceHandleListener.onFailure(SourceHandle.this, e);
+ }
+ }
}
}
}
@@ -366,11 +380,11 @@ public class SourceHandle implements ISourceHandle {
@Override
public void run() {
- logger.debug(
- "Send ack data block event [{}, {}) to {}.",
+ logger.info(
+ "{}: send ack data block event [{}, {}).",
+ SourceHandle.this,
startSequenceId,
- endSequenceId,
- remoteFragmentInstanceId);
+ endSequenceId);
int attempt = 0;
TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent =
new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId, startSequenceId, endSequenceId);
@@ -382,14 +396,22 @@ public class SourceHandle implements ISourceHandle {
break;
} catch (Throwable e) {
logger.error(
- "Failed to send ack data block event [{}, {}) to {} due to {}, attempt times: {}",
+ "{}: failed to send ack data block event [{}, {}) due to {}, attempt times: {}",
+ SourceHandle.this,
startSequenceId,
endSequenceId,
- remoteFragmentInstanceId,
e.getMessage(),
attempt);
if (attempt == MAX_ATTEMPT_TIMES) {
- synchronized (this) {
+ synchronized (SourceHandle.this) {
+ sourceHandleListener.onFailure(SourceHandle.this, e);
+ }
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ synchronized (SourceHandle.this) {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 780860e824..969ff3cca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -24,10 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +60,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
// TODO: (jackie tien) change the port
try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
+ internalServiceClientManager.borrowClient(endPoint)) {
// TODO: (xingtanzjr) consider how to handle the buffer here
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
instance.serializeRequest(buffer);
@@ -68,8 +68,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
TSendFragmentInstanceReq req =
new TSendFragmentInstanceReq(
- new TFragmentInstance(buffer), groupId,
- instance.getType().toString());
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
resp = client.sendFragmentInstance(req);
} catch (IOException e) {
LOGGER.error("can't connect to node {}", endPoint, e);
@@ -84,6 +83,5 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
}
@Override
- public void abort() {
- }
+ public void abort() {}
}