You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/19 07:03:53 UTC

[iotdb] branch xingtanzjr/mpp_issues updated (563fbe2218 -> fb7bf77ccc)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 563fbe2218 fix the issue that distribution planner cannot give correct result for SeriesScanNode
     new 9ac9044586 fix some issue....
     new fb7bf77ccc fix an issue that SourceHandle may become Blocked

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 15 ++++----
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 26 +++++++-------
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 34 +++++++++++-------
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  8 +++--
 .../iotdb/db/mpp/execution/IQueryExecution.java    |  2 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 40 ++++++++++++++++++++--
 .../db/mpp/execution/config/ConfigExecution.java   |  3 ++
 .../mpp/execution/scheduler/ClusterScheduler.java  |  7 ++++
 .../scheduler/SimpleFragInstanceDispatcher.java    |  6 +++-
 .../db/mpp/operator/source/SeriesScanOperator.java |  3 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |  2 +-
 .../mpp/sql/planner/plan/DistributedQueryPlan.java | 11 ++++++
 .../iotdb/db/query/control/SessionManager.java     |  7 +++-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 11 +++++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  9 +++--
 16 files changed, 141 insertions(+), 45 deletions(-)


[iotdb] 01/02: fix some issue....

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ac9044586490419b4be3a33857acac0507259b6
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 18 22:38:34 2022 +0800

    fix some issue....
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  4 ++--
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 ++++++++------
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 22 +++++++++++++++-------
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  8 ++++++--
 .../iotdb/db/mpp/execution/QueryExecution.java     | 20 +++++++++++++++++++-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  7 +++++++
 .../scheduler/SimpleFragInstanceDispatcher.java    |  6 +++++-
 .../db/mpp/operator/source/SeriesScanOperator.java |  3 ++-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |  2 +-
 .../mpp/sql/planner/plan/DistributedQueryPlan.java | 11 +++++++++++
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  7 ++++++-
 12 files changed, 83 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9540120af5..9a29c86d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -497,7 +497,7 @@ public class IoTDBConfig {
   private String rpcImplClassName = TSServiceImpl.class.getName();
 
   /** indicate whether current mode is mpp */
-  private boolean mppMode = false;
+  private boolean mppMode = true;
 
   /** Replace implementation class of influxdb protocol service */
   private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index a5278bdfd4..c6c642b1ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -264,7 +264,7 @@ public class DataBlockManager implements IDataBlockManager {
       throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
     }
 
-    logger.info(
+    logger.debug(
         "Create sink handle to plan node {} of {} for {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
@@ -301,7 +301,7 @@ public class DataBlockManager implements IDataBlockManager {
               + " exists.");
     }
 
-    logger.info(
+    logger.debug(
         "Create source handle from {} for plan node {} of {}",
         remoteFragmentInstanceId,
         localPlanNodeId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 0beab26e36..b0204bd80e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -160,7 +160,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void sendEndOfDataBlockEvent() throws TException {
-    logger.debug(
+    logger.info(
         "Send end of data block event to plan node {} of {}. {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
@@ -220,10 +220,12 @@ public class SinkHandle implements ISinkHandle {
     synchronized (this) {
       sequenceIdToTsBlock.clear();
       closed = true;
-      localMemoryManager
-          .getQueryPool()
-          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
-      bufferRetainedSizeInBytes = 0;
+      if (bufferRetainedSizeInBytes > 0) {
+        localMemoryManager
+            .getQueryPool()
+            .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+        bufferRetainedSizeInBytes = 0;
+      }
     }
     sinkHandleListener.onAborted(this);
     logger.info("Sink handle {} is aborted", this);
@@ -334,7 +336,7 @@ public class SinkHandle implements ISinkHandle {
 
     @Override
     public void run() {
-      logger.debug(
+      logger.info(
           "Send new data block event [{}, {}) to plan node {} of {}.",
           startSequenceId,
           startSequenceId + blockSizes.size(),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 802634efa7..d1f1df86d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -120,6 +120,7 @@ public class SourceHandle implements ISourceHandle {
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
         blocked = SettableFuture.create();
+        logger.info("[SourceHandle {}]: blocked is set to new Future.", localPlanNodeId);
       }
     }
     if (isFinished()) {
@@ -210,6 +211,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+    logger.info("[SourceHandle {}-{}]: No more TsBlock. {} ", localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId);
     this.lastSequenceId = lastSequenceId;
     noMoreTsBlocks = true;
   }
@@ -223,9 +225,14 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized void close() {
+    logger.info("[SourceHandle {}-{}]: closed ", localFragmentInstanceId, localPlanNodeId);
     if (closed) {
       return;
     }
+    if (blocked != null && !blocked.isDone()) {
+      blocked.cancel(true);
+      logger.info("[SourceHandle {}]: blocked is cancelled.", localPlanNodeId);
+    }
     sequenceIdToDataBlockSize.clear();
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
@@ -307,13 +314,12 @@ public class SourceHandle implements ISourceHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Get data blocks [{}, {}) from {} for plan node {} of {}.",
+      logger.info(
+          "[SourceHandle-{}]: Get data blocks [{}, {}) from {}",
+          localPlanNodeId,
           startSequenceId,
           endSequenceId,
-          remoteFragmentInstanceId,
-          localPlanNodeId,
-          localFragmentInstanceId);
+          remoteFragmentInstanceId);
       GetDataBlockRequest req =
           new GetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
       int attempt = 0;
@@ -335,6 +341,7 @@ public class SourceHandle implements ISourceHandle {
             }
             if (!blocked.isDone()) {
               blocked.set(null);
+              logger.info("[SourceHandle {}]: blocked is set null.", localPlanNodeId);
             }
           }
           executorService.submit(
@@ -375,8 +382,9 @@ public class SourceHandle implements ISourceHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Send ack data block event [{}, {}) to {}.",
+      logger.info(
+          "[SourceHandle {}]: Send ack data block event [{}, {}) to {}.",
+          localPlanNodeId,
           startSequenceId,
           endSequenceId,
           remoteFragmentInstanceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 0d45e76390..3d7a292e69 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -43,8 +45,10 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 2;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 20;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
 
@@ -94,7 +98,7 @@ public class Coordinator {
             partitionFetcher,
             schemaFetcher);
     queryExecutionMap.put(queryId, execution);
-
+    LOG.info("[Query: {}] start QueryExecution. Statement: {}", queryId, sql);
     execution.start();
 
     return execution.getStatus();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 172a82db70..9dcb3e5fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -43,6 +43,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -61,6 +63,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
   private final MPPQueryContext context;
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
@@ -112,10 +116,12 @@ public class QueryExecution implements IQueryExecution {
   public void start() {
     doLogicalPlan();
     doDistributedPlan();
+    LOG.info("[{}]: Distribution Plan: {}", this, distributedPlan);
     if (context.getQueryType() == QueryType.READ) {
       // The ResultHandle could only be initialized after distributed planning
       initResultHandle();
     }
+    LOG.info("[{}]: Start to schedule.", this);
     schedule();
   }
 
@@ -163,7 +169,12 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** Release the resources that current QueryExecution hold. */
-  private void releaseResource() {}
+  private void releaseResource() {
+    // close ResultHandle to unblock client's getResult request
+    if (resultHandle != null && !resultHandle.isClosed()) {
+      resultHandle.close();
+    }
+  }
 
   /**
    * This method will be called by the request thread from client connection. This method will block
@@ -175,9 +186,12 @@ public class QueryExecution implements IQueryExecution {
   @Override
   public TsBlock getBatchResult() {
     try {
+      LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId());
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
+      LOG.info("[QueryExecution {}]:  unblock. Cancelled: {}, Done: {}", context.getQueryId(), blocked.isCancelled(), blocked.isDone());
       if (resultHandle.isFinished()) {
+        LOG.info("[QueryExecution {}]:  result is null", context.getQueryId());
         return null;
       }
       return resultHandle.receive();
@@ -271,4 +285,8 @@ public class QueryExecution implements IQueryExecution {
   public boolean isQuery() {
     return context.getQueryType() == QueryType.READ;
   }
+
+  public String toString() {
+    return String.format("QueryExecution[%s]", context.getQueryId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 8b819d6cad..47c57c873d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -43,6 +45,8 @@ import java.util.concurrent.ScheduledExecutorService;
  * this scheduler.
  */
 public class ClusterScheduler implements IScheduler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
   private MPPQueryContext queryContext;
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private QueryStateMachine stateMachine;
@@ -79,13 +83,16 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void start() {
+    LOGGER.info("[{}] start to dispatch fragment instance. size: {}", queryContext.getQueryId(), instances.size());
     stateMachine.transitionToDispatching();
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
     // So we need to start the state fetcher after the dispatching stage.
     try {
+      LOGGER.info("[{}] wait dispatch to be finished", queryContext.getQueryId());
       FragInstanceDispatchResult result = dispatchResultFuture.get();
+      LOGGER.info("[{}] dispatch finished: ", result.isSuccessful());
       if (!result.isSuccessful()) {
         stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
         return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 286f3df3e6..b39ec04406 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -34,7 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
-
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
   private final ExecutorService executor;
 
   public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
@@ -63,7 +65,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
             TSendFragmentInstanceReq req =
                 new TSendFragmentInstanceReq(
                     new TFragmentInstance(buffer), groupId, instance.getType().toString());
+            LOGGER.info("start to dispatch fragment instance: {}", instance.getId());
             resp = client.sendFragmentInstance(req);
+            LOGGER.info("dispatch complete: {}", instance.getId());
             if (!resp.accepted) {
               break;
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9e9946c323..b11f643810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -107,6 +107,7 @@ public class SeriesScanOperator implements DataSourceOperator {
           return true;
         }
       }
+      System.out.println(String.format("[SeriesScanOperator-%s]: hasNext returned: %s", sourceId, hasCachedTsBlock));
       return hasCachedTsBlock;
     } catch (IOException e) {
       throw new RuntimeException("Error happened while scanning the file", e);
@@ -115,7 +116,7 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public boolean isFinished() {
-    return finished || (finished = hasNext());
+    return finished || (finished = !hasNext());
   }
 
   private boolean readChunkData() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c7bdb95285..875894f47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -66,7 +66,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
-  private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
+  private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
   private final ThreadGroup workerGroups;
   private final List<AbstractExecutor> threads;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 2df53235ad..d9a64788f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -54,4 +54,15 @@ public class DistributedQueryPlan {
   public List<FragmentInstance> getInstances() {
     return instances;
   }
+
+  public String toString() {
+    StringBuilder ret = new StringBuilder();
+    ret.append("Instance Count: ").append(instances.size());
+    ret.append(System.lineSeparator());
+    for (FragmentInstance instance : instances) {
+      ret.append(instance);
+      ret.append(System.lineSeparator());
+    }
+    return ret.toString();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 506f180049..aafc3ffd00 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.db.mpp.execution.Coordinator;
 import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
@@ -31,6 +32,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -42,7 +45,7 @@ import java.util.List;
 
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
-
+  private static final Logger LOG = LoggerFactory.getLogger(QueryDataSetUtils.class);
   private static final int FLAG = 0x01;
 
   private QueryDataSetUtils() {}
@@ -193,7 +196,9 @@ public class QueryDataSetUtils {
     int rowCount = 0;
     int[] valueOccupation = new int[columnNum];
     while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+      LOG.info("[ToTSDataSet {}] hasNext return true.", queryExecution);
       TsBlock tsBlock = queryExecution.getBatchResult();
+      LOG.info("[ToTSDataSet {}] result got.", queryExecution);
       if (tsBlock == null) {
         break;
       }


[iotdb] 02/02: fix an issue that SourceHandle may become Blocked

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fb7bf77ccc67feb8e4734e8a28f123716e23d91c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 19 15:03:17 2022 +0800

    fix an issue that SourceHandle may become Blocked
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 11 ++++++-----
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 +++++---------
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 16 +++++++++-------
 .../iotdb/db/mpp/execution/IQueryExecution.java    |  2 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 22 ++++++++++++++++++++--
 .../db/mpp/execution/config/ConfigExecution.java   |  3 +++
 .../iotdb/db/query/control/SessionManager.java     |  7 ++++++-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 11 ++++++++++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  6 +++---
 9 files changed, 64 insertions(+), 28 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index c6c642b1ef..3177888ba8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -138,7 +138,7 @@ public class DataBlockManager implements IDataBlockManager {
 
     @Override
     public void onEndOfDataBlockEvent(EndOfDataBlockEvent e) throws TException {
-      logger.debug(
+      logger.info(
           "End of data block event received, for plan node {} of {} from {}.",
           e.getTargetPlanNodeId(),
           e.getTargetFragmentInstanceId(),
@@ -175,11 +175,12 @@ public class DataBlockManager implements IDataBlockManager {
               .containsKey(sourceHandle.getLocalPlanNodeId())) {
         logger.info(
             "Resources of finished source handle {} has already been released", sourceHandle);
+      } else {
+        sourceHandles
+            .get(sourceHandle.getLocalFragmentInstanceId())
+            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      sourceHandles
-          .get(sourceHandle.getLocalFragmentInstanceId())
-          .remove(sourceHandle.getLocalPlanNodeId());
-      if (sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index b0204bd80e..fc3cd825fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -160,11 +160,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void sendEndOfDataBlockEvent() throws TException {
-    logger.info(
-        "Send end of data block event to plan node {} of {}. {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        Thread.currentThread().getName());
+    logger.info("[SinkHandle {}]: send end of data block event.", this.getRemotePlanNodeId());
     int attempt = 0;
     EndOfDataBlockEvent endOfDataBlockEvent =
         new EndOfDataBlockEvent(
@@ -194,7 +190,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public void close() throws IOException {
-    logger.info("Sink handle {} is being closed.", this);
+    logger.info("[SinkHandle {}]: is being closed.", this.getRemotePlanNodeId());
     if (throwable != null) {
       throw new IOException(throwable);
     }
@@ -211,12 +207,12 @@ public class SinkHandle implements ISinkHandle {
     } catch (TException e) {
       throw new IOException(e);
     }
-    logger.info("Sink handle {} is closed.", this);
+    logger.info("[SinkHandle {}] is closed.", this.getRemotePlanNodeId());
   }
 
   @Override
   public void abort() {
-    logger.info("Sink handle {} is being aborted.", this);
+    logger.info("[SinkHandle {}]: is being aborted.", this.getRemotePlanNodeId());
     synchronized (this) {
       sequenceIdToTsBlock.clear();
       closed = true;
@@ -228,7 +224,7 @@ public class SinkHandle implements ISinkHandle {
       }
     }
     sinkHandleListener.onAborted(this);
-    logger.info("Sink handle {} is aborted", this);
+    logger.info("[SinkHandle {}]: is aborted.", this.getRemotePlanNodeId());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index d1f1df86d8..01dc2f0fb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -211,9 +211,14 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
-    logger.info("[SourceHandle {}-{}]: No more TsBlock. {} ", localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId);
+    logger.info("[SourceHandle {}]: No more TsBlock. {} ", localPlanNodeId, remoteFragmentInstanceId);
     this.lastSequenceId = lastSequenceId;
-    noMoreTsBlocks = true;
+    if (!blocked.isDone() && currSequenceId - 1 == lastSequenceId) {
+      logger.info("[SourceHandle {}]: all blocks are consumed. set blocked to null.", localPlanNodeId);
+      blocked.set(null);
+    } else {
+      logger.info("[SourceHandle {}]: No need to set blocked. Blocked: {}, Consumed: {} ", localPlanNodeId, blocked.isDone(), currSequenceId - 1 == lastSequenceId);
+    }
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -225,7 +230,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized void close() {
-    logger.info("[SourceHandle {}-{}]: closed ", localFragmentInstanceId, localPlanNodeId);
+    logger.info("[SourceHandle {}]: closed ", localPlanNodeId);
     if (closed) {
       return;
     }
@@ -247,10 +252,7 @@ public class SourceHandle implements ISourceHandle {
   @Override
   public boolean isFinished() {
     return throwable == null
-        && noMoreTsBlocks
-        && numActiveGetDataBlocksTask == 0
-        && currSequenceId - 1 == lastSequenceId
-        && sequenceIdToTsBlock.isEmpty();
+        && currSequenceId - 1 == lastSequenceId;
   }
 
   String getRemoteHostname() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 2e0bd9c76d..af9633ee89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -28,6 +28,8 @@ public interface IQueryExecution {
 
   void stop();
 
+  void stopAndCleanup();
+
   ExecutionResult getStatus();
 
   TsBlock getBatchResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9dcb3e5fc3..380361c3fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -110,6 +110,10 @@ public class QueryExecution implements IQueryExecution {
             return;
           }
           this.stop();
+          // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be invoked
+          if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) {
+            releaseResource();
+          }
         });
   }
 
@@ -160,18 +164,28 @@ public class QueryExecution implements IQueryExecution {
     this.distributedPlan = planner.planFragments();
   }
 
-  /** Abort the query and do cleanup work including QuerySchedule aborting and resource releasing */
+  // Stop the workers for this query
   public void stop() {
     if (this.scheduler != null) {
       this.scheduler.stop();
     }
+  }
+
+  // Stop the query and clean up all the resources this query occupied
+  public void stopAndCleanup() {
+    stop();
     releaseResource();
   }
 
   /** Release the resources that current QueryExecution hold. */
   private void releaseResource() {
     // close ResultHandle to unblock client's getResult request
-    if (resultHandle != null && !resultHandle.isClosed()) {
+    // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
+    // There are only two scenarios where the ResultHandle should be closed:
+    //   1. The client fetch all the result and the ResultHandle is finished.
+    //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
+    if (resultHandle != null && resultHandle.isFinished()) {
+      LOG.info("[QueryExecution {}]:  result handle is closed", context.getQueryId());
       resultHandle.close();
     }
   }
@@ -186,12 +200,16 @@ public class QueryExecution implements IQueryExecution {
   @Override
   public TsBlock getBatchResult() {
     try {
+      if (resultHandle.isClosed() || resultHandle.isFinished()) {
+        return null;
+      }
       LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId());
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       LOG.info("[QueryExecution {}]:  unblock. Cancelled: {}, Done: {}", context.getQueryId(), blocked.isCancelled(), blocked.isDone());
       if (resultHandle.isFinished()) {
         LOG.info("[QueryExecution {}]:  result is null", context.getQueryId());
+        releaseResource();
         return null;
       }
       return resultHandle.receive();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index be1c95871f..b134d4896b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -103,6 +103,9 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public void stop() {}
 
+  @Override
+  public void stopAndCleanup() {}
+
   @Override
   public ExecutionResult getStatus() {
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 20aa1d1b86..a8a0faca92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -46,6 +46,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 
@@ -215,6 +216,10 @@ public class SessionManager {
   }
 
   public boolean releaseSessionResource(long sessionId) {
+    return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
+  }
+
+  public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
     sessionIdToZoneId.remove(sessionId);
     sessionIdToClientVersion.remove(sessionId);
 
@@ -224,7 +229,7 @@ public class SessionManager {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
         if (queryIdSet != null) {
           for (Long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
+            releaseQueryResource.accept(queryId);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 2c13c68ad7..3de0a28cc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -562,7 +562,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
       ExecutionResult result =
           COORDINATOR.execute(
               statement,
-              new QueryId(String.valueOf(queryId)),
+              genQueryId(queryId),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
               "",
               PARTITION_FETCHER,
@@ -756,8 +756,17 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
   public void handleClientExit() {
     Long sessionId = SESSION_MANAGER.getCurrSessionId();
     if (sessionId != null) {
+      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
     }
   }
+
+  private void cleanupQueryExecution(Long queryId) {
+    COORDINATOR.getQueryExecution(genQueryId(queryId)).stopAndCleanup();
+  }
+
+  private QueryId genQueryId(long id) {
+    return new QueryId(String.valueOf(id));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index aafc3ffd00..aca0b37906 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -195,10 +195,10 @@ public class QueryDataSetUtils {
 
     int rowCount = 0;
     int[] valueOccupation = new int[columnNum];
-    while (rowCount < fetchSize && queryExecution.hasNextResult()) {
-      LOG.info("[ToTSDataSet {}] hasNext return true.", queryExecution);
+    while (rowCount < fetchSize) {
+      LOG.info("[ToTSDataSet {}] invoke queryExecution.getBatchResult.", queryExecution);
       TsBlock tsBlock = queryExecution.getBatchResult();
-      LOG.info("[ToTSDataSet {}] result got.", queryExecution);
+      LOG.info("[ToTSDataSet {}] result got. Empty: {}", queryExecution, tsBlock == null);
       if (tsBlock == null) {
         break;
       }