You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 11:23:20 UTC

[iotdb] branch xingtanzjr/query_log updated: make log in DataBlockManager more accurate

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_log by this push:
     new 2b2d4223d4 make log in DataBlockManager more accurate
2b2d4223d4 is described below

commit 2b2d4223d4d4dc1751a549bbe772b902ae8e1119
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 5 19:23:09 2022 +0800

    make log in DataBlockManager more accurate
---
 .../statemachine/SchemaRegionStateMachine.java     |  5 +-
 .../execution/datatransfer/DataBlockManager.java   | 22 ++++----
 .../db/mpp/execution/datatransfer/SinkHandle.java  | 63 ++++++++++-----------
 .../mpp/execution/datatransfer/SourceHandle.java   | 66 ++++++++++++++--------
 .../scheduler/SimpleFragInstanceDispatcher.java    | 10 ++--
 5 files changed, 93 insertions(+), 73 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 4fbe1c4a0a..3221d36de4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -78,7 +78,10 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
 
   @Override
   protected DataSet read(FragmentInstance fragmentInstance) {
-    logger.info("SchemaRegionStateMachine[{}]: Execute read plan: FragmentInstance-{}", schemaRegion.getSchemaRegionId(), fragmentInstance.getId());
+    logger.info(
+        "SchemaRegionStateMachine[{}]: Execute read plan: FragmentInstance-{}",
+        schemaRegion.getSchemaRegionId(),
+        fragmentInstance.getId());
     return QUERY_INSTANCE_MANAGER.execSchemaQueryFragmentInstance(fragmentInstance, schemaRegion);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
index c889f18f6d..750ae2bd2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
@@ -182,13 +182,12 @@ public class DataBlockManager implements IDataBlockManager {
 
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
-      logger.info("Release resources of finished source handle {}", sourceHandle);
+      logger.info("{} finished and release resources", sourceHandle);
       if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
           || !sourceHandles
               .get(sourceHandle.getLocalFragmentInstanceId())
               .containsKey(sourceHandle.getLocalPlanNodeId())) {
-        logger.info(
-            "Resources of finished source handle {} has already been released", sourceHandle);
+        logger.info("{} resources has already been released", sourceHandle);
       } else {
         sourceHandles
             .get(sourceHandle.getLocalFragmentInstanceId())
@@ -202,6 +201,7 @@ public class DataBlockManager implements IDataBlockManager {
 
     @Override
     public void onAborted(ISourceHandle sourceHandle) {
+      logger.info("{}: onAborted is invoked", sourceHandle);
       onFinished(sourceHandle);
     }
 
@@ -228,11 +228,7 @@ public class DataBlockManager implements IDataBlockManager {
 
     @Override
     public void onFinish(ISinkHandle sinkHandle) {
-      logger.info("Release resources of finished sink handle {}", sourceHandles);
-      if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
-        logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
-      }
-      sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
+      removeFromDataBlockManager(sinkHandle);
       context.finished();
     }
 
@@ -243,15 +239,21 @@ public class DataBlockManager implements IDataBlockManager {
 
     @Override
     public void onAborted(ISinkHandle sinkHandle) {
-      logger.info("Release resources of aborted sink handle {}", sourceHandles);
+      logger.info("{} onAborted is invoked", sinkHandle);
+      removeFromDataBlockManager(sinkHandle);
+    }
+
+    private void removeFromDataBlockManager(ISinkHandle sinkHandle) {
+      logger.info("{} release resources of finished sink handle", sinkHandle);
       if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
-        logger.info("Resources of aborted sink handle {} has already been released", sinkHandle);
+        logger.info("{} resources already been released", sinkHandle);
       }
       sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
     }
 
     @Override
     public void onFailure(ISinkHandle sinkHandle, Throwable t) {
+      // TODO: (xingtanzjr) should we remove the sinkHandle from DataBlockManager ?
       logger.error("Sink handle {} failed due to {}", sinkHandle, t);
       if (onFailureCallback != null) {
         onFailureCallback.call(t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
index c06a3f1980..518544d7aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +42,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map.Entry;
-import java.util.StringJoiner;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -54,6 +52,7 @@ public class SinkHandle implements ISinkHandle {
   private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
 
   public static final int MAX_ATTEMPT_TIMES = 3;
+  private static final long RETRY_INTERVAL_IN_MS = 1000L;
 
   private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
@@ -155,11 +154,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void sendEndOfDataBlockEvent() throws Exception {
-    logger.debug(
-        "Send end of data block event to plan node {} of {}. {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        Thread.currentThread().getName());
+    logger.info("{} send end of data block event", this);
     int attempt = 0;
     TEndOfDataBlockEvent endOfDataBlockEvent =
         new TEndOfDataBlockEvent(
@@ -173,29 +168,24 @@ public class SinkHandle implements ISinkHandle {
           dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
         client.onEndOfDataBlockEvent(endOfDataBlockEvent);
         break;
-      } catch (TException e) {
+      } catch (Throwable e) {
         logger.error(
-            "Failed to send end of data block event to plan node {} of {} due to {}, attempt times: {}",
-            remotePlanNodeId,
-            remoteFragmentInstanceId,
+            "{} Failed to send end of data block event due to {}, attempt times: {}",
+            this,
             e.getMessage(),
             attempt,
             e);
         if (attempt == MAX_ATTEMPT_TIMES) {
           throw e;
         }
-      } catch (IOException e) {
-        logger.error("can't connect to node {}", remoteEndpoint, e);
-        if (attempt == MAX_ATTEMPT_TIMES) {
-          throw e;
-        }
+        Thread.sleep(RETRY_INTERVAL_IN_MS);
       }
     }
   }
 
   @Override
   public synchronized void setNoMoreTsBlocks() {
-    logger.info("Setting no-more-tsblocks to {}", this);
+    logger.info("{} start to set no-more-tsblocks", this);
     if (aborted) {
       return;
     }
@@ -204,17 +194,19 @@ public class SinkHandle implements ISinkHandle {
     } catch (Exception e) {
       throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
     }
+    logger.info("{} set noMoreTsBlocks to true", this);
     noMoreTsBlocks = true;
     if (isFinished()) {
+      logger.info("{} revoke onFinish() of sinkHandleListener", this);
       sinkHandleListener.onFinish(this);
     }
+    logger.info("{} revoke onEndOfBlocks() of sinkHandleListener", this);
     sinkHandleListener.onEndOfBlocks(this);
-    logger.info("No more tsblocks has been set to {}", this);
   }
 
   @Override
   public synchronized void abort() {
-    logger.info("Sink handle {} is being aborted.", this);
+    logger.info("{} is being aborted.", this);
     sequenceIdToTsBlock.clear();
     aborted = true;
     bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
@@ -225,7 +217,7 @@ public class SinkHandle implements ISinkHandle {
       bufferRetainedSizeInBytes = 0;
     }
     sinkHandleListener.onAborted(this);
-    logger.info("Sink handle {} is aborted", this);
+    logger.info("{} is aborted", this);
   }
 
   @Override
@@ -304,12 +296,11 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public String toString() {
-    return new StringJoiner(", ", SinkHandle.class.getSimpleName() + "[", "]")
-        .add("remoteEndpoint='" + remoteEndpoint + "'")
-        .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
-        .add("remotePlanNodeId='" + remotePlanNodeId + "'")
-        .add("localFragmentInstanceId=" + localFragmentInstanceId)
-        .toString();
+    return String.format(
+        "Query[%s]-[%s-%s-SinkHandle]:",
+        localFragmentInstanceId.queryId,
+        localFragmentInstanceId.fragmentId,
+        localFragmentInstanceId.instanceId);
   }
 
   /**
@@ -333,12 +324,11 @@ public class SinkHandle implements ISinkHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Send new data block event [{}, {}) to plan node {} of {}.",
+      logger.info(
+          "{} send new data block event [{}, {})",
+          SinkHandle.this,
           startSequenceId,
-          startSequenceId + blockSizes.size(),
-          remotePlanNodeId,
-          remoteFragmentInstanceId);
+          startSequenceId + blockSizes.size());
       int attempt = 0;
       TNewDataBlockEvent newDataBlockEvent =
           new TNewDataBlockEvent(
@@ -355,15 +345,20 @@ public class SinkHandle implements ISinkHandle {
           break;
         } catch (Throwable e) {
           logger.error(
-              "Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
-              remotePlanNodeId,
-              remoteFragmentInstanceId,
+              "{} failed to send new data block event due to {}, attempt times: {}",
+              SinkHandle.this,
               e.getMessage(),
               attempt,
               e);
           if (attempt == MAX_ATTEMPT_TIMES) {
             sinkHandleListener.onFailure(SinkHandle.this, e);
           }
+          try {
+            Thread.sleep(RETRY_INTERVAL_IN_MS);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            sinkHandleListener.onFailure(SinkHandle.this, e);
+          }
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 1c8bf70880..10ed0e27b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.StringJoiner;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
@@ -52,6 +51,7 @@ public class SourceHandle implements ISourceHandle {
   private static final Logger logger = LoggerFactory.getLogger(SourceHandle.class);
 
   public static final int MAX_ATTEMPT_TIMES = 3;
+  private static final long RETRY_INTERVAL_IN_MS = 1000;
 
   private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
@@ -120,6 +120,7 @@ public class SourceHandle implements ISourceHandle {
         .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
 
     if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
+      logger.info("{}: no buffered TsBlock, blocked", this);
       blocked = SettableFuture.create();
     }
     if (isFinished()) {
@@ -189,6 +190,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+    logger.info("{}: receive NoMoreTsBlock event. ", this);
     this.lastSequenceId = lastSequenceId;
     if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
       blocked.set(null);
@@ -199,6 +201,11 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
+    logger.info(
+        "{}: receive newDataBlockEvent. [{}, {})",
+        this,
+        startSequenceId,
+        startSequenceId + dataBlockSizes.size());
     for (int i = 0; i < dataBlockSizes.size(); i++) {
       sequenceIdToDataBlockSize.put(i + startSequenceId, dataBlockSizes.get(i));
     }
@@ -267,12 +274,12 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public String toString() {
-    return new StringJoiner(", ", SourceHandle.class.getSimpleName() + "[", "]")
-        .add("remoteEndpoint='" + remoteEndpoint + "'")
-        .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
-        .add("localFragmentInstanceId=" + localFragmentInstanceId)
-        .add("localPlanNodeId='" + localPlanNodeId + "'")
-        .toString();
+    return String.format(
+        "Query[%s]-[%s-%s-SourceHandle-%s]",
+        localFragmentInstanceId.getQueryId(),
+        localFragmentInstanceId.getFragmentId(),
+        localFragmentInstanceId.getInstanceId(),
+        localPlanNodeId);
   }
 
   /** Get data blocks from an upstream fragment instance. */
@@ -300,13 +307,11 @@ public class SourceHandle implements ISourceHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Get data blocks [{}, {}) from {} for plan node {} of {}.",
+      logger.info(
+          "{}: try to get data blocks [{}, {}) ",
+          SourceHandle.this,
           startSequenceId,
-          endSequenceId,
-          remoteFragmentInstanceId,
-          localPlanNodeId,
-          localFragmentInstanceId);
+          endSequenceId);
       TGetDataBlockRequest req =
           new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
       int attempt = 0;
@@ -320,6 +325,7 @@ public class SourceHandle implements ISourceHandle {
             TsBlock tsBlock = serde.deserialize(byteBuffer);
             tsBlocks.add(tsBlock);
           }
+          logger.info("{}: got data blocks. count: {}", SourceHandle.this, tsBlocks.size());
           synchronized (SourceHandle.this) {
             if (aborted) {
               return;
@@ -336,8 +342,8 @@ public class SourceHandle implements ISourceHandle {
           break;
         } catch (Throwable e) {
           logger.error(
-              "Failed to get data block from {} due to {}, attempt times: {}",
-              remoteFragmentInstanceId,
+              "{}: failed to get data block {}, attempt times: {}",
+              SourceHandle.this,
               e.getMessage(),
               attempt);
           if (attempt == MAX_ATTEMPT_TIMES) {
@@ -349,6 +355,14 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
+          try {
+            Thread.sleep(RETRY_INTERVAL_IN_MS);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            synchronized (SourceHandle.this) {
+              sourceHandleListener.onFailure(SourceHandle.this, e);
+            }
+          }
         }
       }
     }
@@ -366,11 +380,11 @@ public class SourceHandle implements ISourceHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Send ack data block event [{}, {}) to {}.",
+      logger.info(
+          "{}: send ack data block event [{}, {}).",
+          SourceHandle.this,
           startSequenceId,
-          endSequenceId,
-          remoteFragmentInstanceId);
+          endSequenceId);
       int attempt = 0;
       TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent =
           new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId, startSequenceId, endSequenceId);
@@ -382,14 +396,22 @@ public class SourceHandle implements ISourceHandle {
           break;
         } catch (Throwable e) {
           logger.error(
-              "Failed to send ack data block event [{}, {}) to {} due to {}, attempt times: {}",
+              "{}: failed to send ack data block event [{}, {}) due to {}, attempt times: {}",
+              SourceHandle.this,
               startSequenceId,
               endSequenceId,
-              remoteFragmentInstanceId,
               e.getMessage(),
               attempt);
           if (attempt == MAX_ATTEMPT_TIMES) {
-            synchronized (this) {
+            synchronized (SourceHandle.this) {
+              sourceHandleListener.onFailure(SourceHandle.this, e);
+            }
+          }
+          try {
+            Thread.sleep(RETRY_INTERVAL_IN_MS);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            synchronized (SourceHandle.this) {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 780860e824..969ff3cca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -24,10 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +60,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
             TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
             // TODO: (jackie tien) change the port
             try (SyncDataNodeInternalServiceClient client =
-                     internalServiceClientManager.borrowClient(endPoint)) {
+                internalServiceClientManager.borrowClient(endPoint)) {
               // TODO: (xingtanzjr) consider how to handle the buffer here
               ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
               instance.serializeRequest(buffer);
@@ -68,8 +68,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
               TSendFragmentInstanceReq req =
                   new TSendFragmentInstanceReq(
-                      new TFragmentInstance(buffer), groupId,
-                      instance.getType().toString());
+                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
               resp = client.sendFragmentInstance(req);
             } catch (IOException e) {
               LOGGER.error("can't connect to node {}", endPoint, e);
@@ -84,6 +83,5 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
   }
 
   @Override
-  public void abort() {
-  }
+  public void abort() {}
 }