You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/19 07:03:54 UTC

[iotdb] 01/02: fix some issue....

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ac9044586490419b4be3a33857acac0507259b6
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 18 22:38:34 2022 +0800

    fix some issue....
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  4 ++--
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 ++++++++------
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 22 +++++++++++++++-------
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  8 ++++++--
 .../iotdb/db/mpp/execution/QueryExecution.java     | 20 +++++++++++++++++++-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  7 +++++++
 .../scheduler/SimpleFragInstanceDispatcher.java    |  6 +++++-
 .../db/mpp/operator/source/SeriesScanOperator.java |  3 ++-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |  2 +-
 .../mpp/sql/planner/plan/DistributedQueryPlan.java | 11 +++++++++++
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  7 ++++++-
 12 files changed, 83 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9540120af5..9a29c86d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -497,7 +497,7 @@ public class IoTDBConfig {
   private String rpcImplClassName = TSServiceImpl.class.getName();
 
   /** indicate whether current mode is mpp */
-  private boolean mppMode = false;
+  private boolean mppMode = true;
 
   /** Replace implementation class of influxdb protocol service */
   private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index a5278bdfd4..c6c642b1ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -264,7 +264,7 @@ public class DataBlockManager implements IDataBlockManager {
       throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
     }
 
-    logger.info(
+    logger.debug(
         "Create sink handle to plan node {} of {} for {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
@@ -301,7 +301,7 @@ public class DataBlockManager implements IDataBlockManager {
               + " exists.");
     }
 
-    logger.info(
+    logger.debug(
         "Create source handle from {} for plan node {} of {}",
         remoteFragmentInstanceId,
         localPlanNodeId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 0beab26e36..b0204bd80e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -160,7 +160,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void sendEndOfDataBlockEvent() throws TException {
-    logger.debug(
+    logger.info(
         "Send end of data block event to plan node {} of {}. {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
@@ -220,10 +220,12 @@ public class SinkHandle implements ISinkHandle {
     synchronized (this) {
       sequenceIdToTsBlock.clear();
       closed = true;
-      localMemoryManager
-          .getQueryPool()
-          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
-      bufferRetainedSizeInBytes = 0;
+      if (bufferRetainedSizeInBytes > 0) {
+        localMemoryManager
+            .getQueryPool()
+            .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+        bufferRetainedSizeInBytes = 0;
+      }
     }
     sinkHandleListener.onAborted(this);
     logger.info("Sink handle {} is aborted", this);
@@ -334,7 +336,7 @@ public class SinkHandle implements ISinkHandle {
 
     @Override
     public void run() {
-      logger.debug(
+      logger.info(
           "Send new data block event [{}, {}) to plan node {} of {}.",
           startSequenceId,
           startSequenceId + blockSizes.size(),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 802634efa7..d1f1df86d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -120,6 +120,7 @@ public class SourceHandle implements ISourceHandle {
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
         blocked = SettableFuture.create();
+        logger.info("[SourceHandle {}]: blocked is set to new Future.", localPlanNodeId);
       }
     }
     if (isFinished()) {
@@ -210,6 +211,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+    logger.info("[SourceHandle {}-{}]: No more TsBlock. {} ", localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId);
     this.lastSequenceId = lastSequenceId;
     noMoreTsBlocks = true;
   }
@@ -223,9 +225,14 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized void close() {
+    logger.info("[SourceHandle {}-{}]: closed ", localFragmentInstanceId, localPlanNodeId);
     if (closed) {
       return;
     }
+    if (blocked != null && !blocked.isDone()) {
+      blocked.cancel(true);
+      logger.info("[SourceHandle {}]: blocked is cancelled.", localPlanNodeId);
+    }
     sequenceIdToDataBlockSize.clear();
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
@@ -307,13 +314,12 @@ public class SourceHandle implements ISourceHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Get data blocks [{}, {}) from {} for plan node {} of {}.",
+      logger.info(
+          "[SourceHandle-{}]: Get data blocks [{}, {}) from {}",
+          localPlanNodeId,
           startSequenceId,
           endSequenceId,
-          remoteFragmentInstanceId,
-          localPlanNodeId,
-          localFragmentInstanceId);
+          remoteFragmentInstanceId);
       GetDataBlockRequest req =
           new GetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
       int attempt = 0;
@@ -335,6 +341,7 @@ public class SourceHandle implements ISourceHandle {
             }
             if (!blocked.isDone()) {
               blocked.set(null);
+              logger.info("[SourceHandle {}]: blocked is set null.", localPlanNodeId);
             }
           }
           executorService.submit(
@@ -375,8 +382,9 @@ public class SourceHandle implements ISourceHandle {
 
     @Override
     public void run() {
-      logger.debug(
-          "Send ack data block event [{}, {}) to {}.",
+      logger.info(
+          "[SourceHandle {}]: Send ack data block event [{}, {}) to {}.",
+          localPlanNodeId,
           startSequenceId,
           endSequenceId,
           remoteFragmentInstanceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 0d45e76390..3d7a292e69 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -43,8 +45,10 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 2;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 20;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
 
@@ -94,7 +98,7 @@ public class Coordinator {
             partitionFetcher,
             schemaFetcher);
     queryExecutionMap.put(queryId, execution);
-
+    LOG.info("[Query: {}] start QueryExecution. Statement: {}", queryId, sql);
     execution.start();
 
     return execution.getStatus();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 172a82db70..9dcb3e5fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -43,6 +43,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -61,6 +63,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
+  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
   private final MPPQueryContext context;
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
@@ -112,10 +116,12 @@ public class QueryExecution implements IQueryExecution {
   public void start() {
     doLogicalPlan();
     doDistributedPlan();
+    LOG.info("[{}]: Distribution Plan: {}", this, distributedPlan);
     if (context.getQueryType() == QueryType.READ) {
       // The ResultHandle could only be initialized after distributed planning
       initResultHandle();
     }
+    LOG.info("[{}]: Start to schedule.", this);
     schedule();
   }
 
@@ -163,7 +169,12 @@ public class QueryExecution implements IQueryExecution {
   }
 
   /** Release the resources that current QueryExecution hold. */
-  private void releaseResource() {}
+  private void releaseResource() {
+    // close ResultHandle to unblock client's getResult request
+    if (resultHandle != null && !resultHandle.isClosed()) {
+      resultHandle.close();
+    }
+  }
 
   /**
    * This method will be called by the request thread from client connection. This method will block
@@ -175,9 +186,12 @@ public class QueryExecution implements IQueryExecution {
   @Override
   public TsBlock getBatchResult() {
     try {
+      LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId());
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
+      LOG.info("[QueryExecution {}]:  unblock. Cancelled: {}, Done: {}", context.getQueryId(), blocked.isCancelled(), blocked.isDone());
       if (resultHandle.isFinished()) {
+        LOG.info("[QueryExecution {}]:  result is null", context.getQueryId());
         return null;
       }
       return resultHandle.receive();
@@ -271,4 +285,8 @@ public class QueryExecution implements IQueryExecution {
   public boolean isQuery() {
     return context.getQueryType() == QueryType.READ;
   }
+
+  public String toString() {
+    return String.format("QueryExecution[%s]", context.getQueryId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 8b819d6cad..47c57c873d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -43,6 +45,8 @@ import java.util.concurrent.ScheduledExecutorService;
  * this scheduler.
  */
 public class ClusterScheduler implements IScheduler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
   private MPPQueryContext queryContext;
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private QueryStateMachine stateMachine;
@@ -79,13 +83,16 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void start() {
+    LOGGER.info("[{}] start to dispatch fragment instance. size: {}", queryContext.getQueryId(), instances.size());
     stateMachine.transitionToDispatching();
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
     // So we need to start the state fetcher after the dispatching stage.
     try {
+      LOGGER.info("[{}] wait dispatch to be finished", queryContext.getQueryId());
       FragInstanceDispatchResult result = dispatchResultFuture.get();
+      LOGGER.info("[{}] dispatch finished: ", result.isSuccessful());
       if (!result.isSuccessful()) {
         stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
         return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 286f3df3e6..b39ec04406 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -34,7 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
-
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
   private final ExecutorService executor;
 
   public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
@@ -63,7 +65,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
             TSendFragmentInstanceReq req =
                 new TSendFragmentInstanceReq(
                     new TFragmentInstance(buffer), groupId, instance.getType().toString());
+            LOGGER.info("start to dispatch fragment instance: {}", instance.getId());
             resp = client.sendFragmentInstance(req);
+            LOGGER.info("dispatch complete: {}", instance.getId());
             if (!resp.accepted) {
               break;
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9e9946c323..b11f643810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -107,6 +107,7 @@ public class SeriesScanOperator implements DataSourceOperator {
           return true;
         }
       }
+      System.out.println(String.format("[SeriesScanOperator-%s]: hasNext returned: %s", sourceId, hasCachedTsBlock));
       return hasCachedTsBlock;
     } catch (IOException e) {
       throw new RuntimeException("Error happened while scanning the file", e);
@@ -115,7 +116,7 @@ public class SeriesScanOperator implements DataSourceOperator {
 
   @Override
   public boolean isFinished() {
-    return finished || (finished = hasNext());
+    return finished || (finished = !hasNext());
   }
 
   private boolean readChunkData() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c7bdb95285..875894f47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -66,7 +66,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
-  private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
+  private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
   private final ThreadGroup workerGroups;
   private final List<AbstractExecutor> threads;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 2df53235ad..d9a64788f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -54,4 +54,15 @@ public class DistributedQueryPlan {
   public List<FragmentInstance> getInstances() {
     return instances;
   }
+
+  public String toString() {
+    StringBuilder ret = new StringBuilder();
+    ret.append("Instance Count: ").append(instances.size());
+    ret.append(System.lineSeparator());
+    for (FragmentInstance instance : instances) {
+      ret.append(instance);
+      ret.append(System.lineSeparator());
+    }
+    return ret.toString();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 506f180049..aafc3ffd00 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.db.mpp.execution.Coordinator;
 import org.apache.iotdb.db.mpp.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
@@ -31,6 +32,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -42,7 +45,7 @@ import java.util.List;
 
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
-
+  private static final Logger LOG = LoggerFactory.getLogger(QueryDataSetUtils.class);
   private static final int FLAG = 0x01;
 
   private QueryDataSetUtils() {}
@@ -193,7 +196,9 @@ public class QueryDataSetUtils {
     int rowCount = 0;
     int[] valueOccupation = new int[columnNum];
     while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+      LOG.info("[ToTSDataSet {}] hasNext return true.", queryExecution);
       TsBlock tsBlock = queryExecution.getBatchResult();
+      LOG.info("[ToTSDataSet {}] result got.", queryExecution);
       if (tsBlock == null) {
         break;
       }