You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/04 13:10:44 UTC

[iotdb] branch xingtanzjr/query_log created (now e2e19a7f05)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e2e19a7f05 tmp saved

This branch includes the following new commits:

     new e2e19a7f05 tmp saved

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: tmp saved

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e2e19a7f057c53d7bb65bc1acba07c1f23b928ec
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 4 21:10:30 2022 +0800

    tmp saved
---
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 11 ++++
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |  5 +-
 .../db/mpp/plan/execution/QueryExecution.java      | 35 ++++++++++---
 .../plan/node/metedata/read/SchemaFetchNode.java   |  4 ++
 .../node/metedata/read/SeriesSchemaMergeNode.java  |  4 ++
 .../db/mpp/plan/scheduler/ClusterScheduler.java    | 11 +++-
 .../scheduler/SimpleFragInstanceDispatcher.java    | 58 ++++++++++------------
 7 files changed, 88 insertions(+), 40 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 0b282f5df3..5e20cd729d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -68,6 +68,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -80,6 +83,8 @@ import java.util.stream.Collectors;
 /** Analyze the statement and generate Analysis. */
 public class Analyzer {
 
+  private static final Logger logger = LoggerFactory.getLogger(Analyzer.class);
+
   private final MPPQueryContext context;
 
   private final IPartitionFetcher partitionFetcher;
@@ -96,6 +101,10 @@ public class Analyzer {
     return new AnalyzeVisitor().process(statement, context);
   }
 
+  private String getLogHeader() {
+    return String.format("Query[%s]:", context.getQueryId());
+  }
+
   /** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
   private final class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> {
 
@@ -118,7 +127,9 @@ public class Analyzer {
             (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, patternTree);
 
         // request schema fetch API
+        logger.info("{} fetch query schema...", getLogHeader());
         SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+        logger.info("{} fetch schema done", getLogHeader());
         // (xingtanzjr) If there is no leaf node in the schema tree, the query should be completed
         // immediately
         if (schemaTree.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 256b53779e..d0c86da11b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -75,7 +75,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
     // TODO: (xingtanzjr) throw exception
     if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status);
+      throw new RuntimeException(
+          String.format(
+              "cannot fetch schema, status is: %s, msg is: %s",
+              executionResult.status.getCode(), executionResult.status.getMessage()));
     }
     SchemaTree result = new SchemaTree();
     while (coordinator.getQueryExecution(queryId).hasNextResult()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index dce5dccc71..975b7d4e16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
 import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -40,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
@@ -70,7 +70,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
  */
 public class QueryExecution implements IQueryExecution {
-  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+  private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class);
 
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -136,6 +136,9 @@ public class QueryExecution implements IQueryExecution {
 
   public void start() {
     if (skipExecute()) {
+      logger.info(
+          "{} execution of query will be skipped. Transit to FINISHED immediately.",
+          getLogHeader());
       stateMachine.transitionToFinished();
       return;
     }
@@ -152,12 +155,13 @@ public class QueryExecution implements IQueryExecution {
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query need
-  private static Analysis analyze(
+  private Analysis analyze(
       Statement statement,
       MPPQueryContext context,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
     // initialize the variable `analysis`
+    logger.info("{} start to analyze query", getLogHeader());
     return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
   }
 
@@ -186,14 +190,25 @@ public class QueryExecution implements IQueryExecution {
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
   public void doLogicalPlan() {
+    logger.info("{} do logical plan...", getLogHeader());
     LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
     this.logicalPlan = planner.plan(this.analysis);
+    logger.info(
+        "{} logical plan is: \n {}",
+        getLogHeader(),
+        PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
   }
 
   // Generate the distributed plan and split it into fragments
   public void doDistributedPlan() {
+    logger.info("{} do distribution plan...", getLogHeader());
     DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
     this.distributedPlan = planner.planFragments();
+    logger.info(
+        "{} distribution plan done. Fragment instance count is {}, details is: \n {}",
+        getLogHeader(),
+        distributedPlan.getInstances().size(),
+        distributedPlan.getInstances());
   }
 
   // Stop the workers for this query
@@ -294,12 +309,16 @@ public class QueryExecution implements IQueryExecution {
           state == QueryState.FINISHED || state == QueryState.RUNNING
               ? TSStatusCode.SUCCESS_STATUS
               : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
+      return new ExecutionResult(
+          context.getQueryId(), RpcUtils.getStatus(statusCode, stateMachine.getFailureMessage()));
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
-      Thread.currentThread().interrupt();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       return new ExecutionResult(
-          context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+          context.getQueryId(),
+          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage()));
     }
   }
 
@@ -333,4 +352,8 @@ public class QueryExecution implements IQueryExecution {
   public String toString() {
     return String.format("QueryExecution[%s]", context.getQueryId());
   }
+
+  private String getLogHeader() {
+    return String.format("Query[%s]:", context.getQueryId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
index a43058adc4..fa62745926 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -75,4 +75,8 @@ public class SchemaFetchNode extends SchemaScanNode {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSchemaFetch(this, context);
   }
+
+  public String toString() {
+    return String.format("SchemaFetchNode-%s", getPlanNodeId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
index a7c4f4052e..c501dc0384 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
@@ -57,4 +57,8 @@ public class SeriesSchemaMergeNode extends AbstractSchemaMergeNode {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSchemaMerge(this, context);
   }
+
+  public String toString() {
+    return String.format("SchemaMergeNode-%s", getPlanNodeId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 48c4268034..db4bcfdb84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -48,7 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * this scheduler.
  */
 public class ClusterScheduler implements IScheduler {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+  private static final Logger logger = LoggerFactory.getLogger(ClusterScheduler.class);
 
   private MPPQueryContext queryContext;
   // The stateMachine of the QueryExecution owned by this QueryScheduler
@@ -90,6 +90,7 @@ public class ClusterScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
+    logger.info("{} transit to DISPATCHING", getLogHeader());
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
@@ -102,7 +103,9 @@ public class ClusterScheduler implements IScheduler {
       }
     } catch (InterruptedException | ExecutionException e) {
       // If the dispatch failed, we make the QueryState as failed, and return.
-      Thread.currentThread().interrupt();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       stateMachine.transitionToFailed(e);
       return;
     }
@@ -156,4 +159,8 @@ public class ClusterScheduler implements IScheduler {
 
   // After sending, start to collect the states of these fragment instances
   private void startMonitorInstances() {}
+
+  private String getLogHeader() {
+    return String.format("Query[%s]", queryContext.getQueryId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index fec10957bf..4c64c3b7b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,20 +19,14 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -55,31 +49,33 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
   public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
     return executor.submit(
         () -> {
-          TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
-          for (FragmentInstance instance : instances) {
-            TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
-            // TODO: (jackie tien) change the port
-            try (SyncDataNodeInternalServiceClient client =
-                internalServiceClientManager.borrowClient(endPoint)) {
-              // TODO: (xingtanzjr) consider how to handle the buffer here
-              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-              instance.serializeRequest(buffer);
-              buffer.flip();
-              TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
-              TSendFragmentInstanceReq req =
-                  new TSendFragmentInstanceReq(
-                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
-              resp = client.sendFragmentInstance(req);
-            } catch (IOException e) {
-              LOGGER.error("can't connect to node {}", endPoint, e);
-              throw e;
-            }
-
-            if (!resp.accepted) {
-              break;
-            }
-          }
-          return new FragInstanceDispatchResult(resp.accepted);
+          throw new RuntimeException("Dispatch Error");
+          //          TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+          //          for (FragmentInstance instance : instances) {
+          //            TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+          //            // TODO: (jackie tien) change the port
+          //            try (SyncDataNodeInternalServiceClient client =
+          //                internalServiceClientManager.borrowClient(endPoint)) {
+          //              // TODO: (xingtanzjr) consider how to handle the buffer here
+          //              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+          //              instance.serializeRequest(buffer);
+          //              buffer.flip();
+          //              TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+          //              TSendFragmentInstanceReq req =
+          //                  new TSendFragmentInstanceReq(
+          //                      new TFragmentInstance(buffer), groupId,
+          // instance.getType().toString());
+          //              resp = client.sendFragmentInstance(req);
+          //            } catch (IOException e) {
+          //              LOGGER.error("can't connect to node {}", endPoint, e);
+          //              throw e;
+          //            }
+          //
+          //            if (!resp.accepted) {
+          //              break;
+          //            }
+          //          }
+          //          return new FragInstanceDispatchResult(resp.accepted);
         });
   }