You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/26 10:21:11 UTC

[iotdb] branch xingtanzjr/query_execution updated: complete basic fragment state fetcher

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new 3e498df  complete basic fragment state fetcher
3e498df is described below

commit 3e498dfce8f624913a3f08cbf598459334d12dcc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Mar 26 18:19:59 2022 +0800

    complete basic fragment state fetcher
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  4 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 36 ++++++++++---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 17 +++++-
 .../scheduler/AbstractFragInsStateFetcher.java     | 60 +++++++++++++++++++++
 .../mpp/execution/scheduler/ClusterScheduler.java  | 53 ++++++++++++++-----
 .../scheduler/FixedRateFragInsStateFetcher.java    | 61 ++++++++++++++++++++++
 .../scheduler/IFragInstanceDispatcher.java         |  2 +
 ...patcher.java => IFragInstanceStateFetcher.java} | 14 +----
 .../scheduler/SimpleFragInstanceDispatcher.java    |  5 ++
 9 files changed, 221 insertions(+), 31 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 36ec40b..ee86665 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -42,4 +42,8 @@ public class MPPQueryContext {
   public QueryId getQueryId() {
     return queryId;
   }
+
+  public QueryType getQueryType() {
+    return queryType;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 4a809a0..2c2fd26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -59,10 +59,15 @@ public class QueryExecution {
     this.context = context;
     this.analysis = analyze(statement, context);
     this.stateMachine = new QueryStateMachine(context.getQueryId());
-    // TODO: (xingtanzjr) initialize the query scheduler according to configuration
-    this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances());
 
-    // TODO: register callbacks in QueryStateMachine when the QueryExecution is aborted/finished
+    // We add the abort logic inside the QueryExecution.
+    // So that the other components can only focus on the state change.
+    stateMachine.addStateChangeListener(state -> {
+      if (!state.isDone()) {
+        return;
+      }
+      this.cleanup();
+    });
   }
 
   public void start() {
@@ -78,7 +83,9 @@ public class QueryExecution {
   }
 
   private void schedule() {
-    this.scheduler = new ClusterScheduler(this.stateMachine, this.distributedPlan.getInstances());
+    // TODO: (xingtanzjr) initialize the query scheduler according to configuration
+    this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+    // TODO: (xingtanzjr) how to make the schedule running asynchronously
     this.scheduler.start();
   }
 
@@ -95,6 +102,23 @@ public class QueryExecution {
   }
 
   /**
+   * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource releasing
+   */
+  private void cleanup() {
+    if (this.scheduler != null) {
+      this.scheduler.abort();
+    }
+    releaseResource();
+  }
+
+  /**
+   * Release the resources that current QueryExecution hold.
+   */
+  private void releaseResource() {
+
+  }
+
+  /**
    * This method will be called by the request thread from client connection. This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
    * 3. The query has been cancelled 4. The query is timeout This method wil
@@ -113,8 +137,8 @@ public class QueryExecution {
    * @return ExecutionStatus. Contains the QueryId and the TSStatus.
    */
   public ExecutionStatus getStatus() {
-    // Although we monitor the state to transition to FINISHED, the future will return if any Terminated state is triggered
-    ListenableFuture<QueryState> future =  stateMachine.getStateChange(QueryState.FINISHED);
+    // Although we monitor the state to transition to RUNNING, the future will return if any Terminated state is triggered
+    ListenableFuture<QueryState> future =  stateMachine.getStateChange(QueryState.RUNNING);
     try {
       QueryState state = future.get();
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 3585009..33b9049 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -20,8 +20,13 @@ package org.apache.iotdb.db.mpp.execution;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 /**
@@ -31,6 +36,7 @@ import java.util.concurrent.Executor;
 public class QueryStateMachine {
     private String name;
     private StateMachine<QueryState> queryState;
+    private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
 
     // The executor will be used in all the state machines belonged to this query.
     private Executor stateMachineExecutor;
@@ -38,7 +44,16 @@ public class QueryStateMachine {
     public QueryStateMachine(QueryId queryId) {
         this.name = String.format("QueryStateMachine[%s]", queryId);
         this.stateMachineExecutor = getStateMachineExecutor();
-        queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
+        this.fragInstanceStateMap = new ConcurrentHashMap<>();
+        this.queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
+    }
+
+    public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+        this.fragInstanceStateMap.put(id, state);
+    }
+
+    public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+        this.fragInstanceStateMap.put(id, state);
     }
 
     // TODO: (xingtanzjr) consider more suitable method for executor initialization
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
new file mode 100644
index 0000000..0cd65f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
+
+    protected QueryStateMachine stateMachine;
+    protected ExecutorService executor;
+    protected List<FragmentInstance> instances;
+
+    public AbstractFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+        this.stateMachine = stateMachine;
+        this.executor = executor;
+        this.instances = instances;
+    }
+
+    public abstract void start();
+
+    protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
+        InternalService.Client client = InternalServiceClientFactory
+                .getInternalServiceClient(instance.getHostEndpoint().ip, instance.getHostEndpoint().port);
+        TFragmentInstanceStateResp resp = client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+        return FragmentInstanceState.valueOf(resp.state);
+    }
+
+    private TFragmentInstanceId getTId(FragmentInstance instance) {
+        return new TFragmentInstanceId(instance.getId().getQueryId().getId(),
+                String.valueOf(instance.getId().getFragmentId().getId()), instance.getId().getInstanceId());
+    }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 820da6e..c4050a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,19 +18,18 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import io.airlift.units.Duration;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
-import org.apache.iotdb.db.mpp.execution.QueryState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
-import io.airlift.units.Duration;
-
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
 
 /**
  * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
@@ -42,14 +41,15 @@ import java.util.concurrent.FutureTask;
 public class ClusterScheduler implements IScheduler {
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private QueryStateMachine stateMachine;
-
+  private QueryType queryType;
   // The fragment instances which should be sent to corresponding Nodes.
   private List<FragmentInstance> instances;
   private IFragInstanceDispatcher dispatcher;
 
-  public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
+  public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
     this.stateMachine = stateMachine;
     this.instances = instances;
+    this.queryType = queryType;
     this.dispatcher = new SimpleFragInstanceDispatcher();
   }
 
@@ -58,21 +58,50 @@ public class ClusterScheduler implements IScheduler {
     // TODO: consider where the state transition should be put
     stateMachine.transitionToDispatching();
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
-    //TODO: start the FragmentInstance state fetcher
+
+    // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
+    // So we need to start the state fetcher after the dispatching stage.
+    boolean success = waitDispatchingFinished(dispatchResultFuture);
+    // If the dispatch failed, we make the QueryState as failed, and return.
+    if (!success) {
+      stateMachine.transitionToFailed();
+      return;
+    }
+
+    // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+    if (queryType == QueryType.WRITE) {
+      stateMachine.transitionToFinished();
+      return;
+    }
+
+    // The FragmentInstances has been dispatched successfully to corresponding host, we mark the QueryState to Running
+    stateMachine.transitionToRunning();
+    instances.forEach(instance -> {
+      stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
+    });
+
+    // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
+
+  }
+
+  private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
     try {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (result.isSuccessful()) {
-        stateMachine.transitionToRunning();
-      } else {
-        stateMachine.transitionToFailed();
+        return true;
       }
     } catch (InterruptedException | ExecutionException e) {
-      e.printStackTrace();
+      // TODO: (xingtanzjr) record the dispatch failure reason.
     }
+    return false;
   }
 
   @Override
-  public void abort() {}
+  public void abort() {
+    if (this.dispatcher != null) {
+      dispatcher.abort();
+    }
+  }
 
   @Override
   public Duration getTotalCpuTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
new file mode 100644
index 0000000..583f250
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
+    private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
+    private volatile boolean aborted = false;
+
+    public FixedRateFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+        super(stateMachine, executor, instances);
+    }
+
+    @Override
+    public void start() {
+        while (!aborted) {
+            try {
+                Future<Boolean> future = executor.submit(() -> {
+                    for (FragmentInstance instance : instances) {
+                        try {
+                            FragmentInstanceState state = fetchState(instance);
+                            stateMachine.updateFragInstanceState(instance.getId(), state);
+                        } catch (TException e) {
+                            // TODO: do nothing ?
+                            return false;
+                        }
+                    }
+                    return true;
+                });
+                future.get();
+                Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
+            } catch (InterruptedException | ExecutionException e) {
+                // TODO:
+            }
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
index ce3b2d6..9fdedab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
@@ -31,4 +31,6 @@ public interface IFragInstanceDispatcher {
      * @return Boolean.
      */
     Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+
+    void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
similarity index 68%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
index ce3b2d6..33d0d53 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
@@ -19,16 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-
-import java.util.List;
-import java.util.concurrent.Future;
-
-public interface IFragInstanceDispatcher {
-    /**
-     * Dispatch all Fragment instances asynchronously
-     * @param instances Fragment instance list
-     * @return Boolean.
-     */
-    Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+public interface IFragInstanceStateFetcher {
+    void start();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index ffe4758..f544c8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -47,4 +47,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
         new Thread(dispatchTask).start();
         return dispatchTask;
     }
+
+    @Override
+    public void abort() {
+
+    }
 }