You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/26 10:21:11 UTC
[iotdb] branch xingtanzjr/query_execution updated: complete basic fragment state fetcher
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new 3e498df complete basic fragment state fetcher
3e498df is described below
commit 3e498dfce8f624913a3f08cbf598459334d12dcc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Mar 26 18:19:59 2022 +0800
complete basic fragment state fetcher
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 4 ++
.../iotdb/db/mpp/execution/QueryExecution.java | 36 ++++++++++---
.../iotdb/db/mpp/execution/QueryStateMachine.java | 17 +++++-
.../scheduler/AbstractFragInsStateFetcher.java | 60 +++++++++++++++++++++
.../mpp/execution/scheduler/ClusterScheduler.java | 53 ++++++++++++++-----
.../scheduler/FixedRateFragInsStateFetcher.java | 61 ++++++++++++++++++++++
.../scheduler/IFragInstanceDispatcher.java | 2 +
...patcher.java => IFragInstanceStateFetcher.java} | 14 +----
.../scheduler/SimpleFragInstanceDispatcher.java | 5 ++
9 files changed, 221 insertions(+), 31 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 36ec40b..ee86665 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -42,4 +42,8 @@ public class MPPQueryContext {
public QueryId getQueryId() {
return queryId;
}
+
+ public QueryType getQueryType() {
+ return queryType;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 4a809a0..2c2fd26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -59,10 +59,15 @@ public class QueryExecution {
this.context = context;
this.analysis = analyze(statement, context);
this.stateMachine = new QueryStateMachine(context.getQueryId());
- // TODO: (xingtanzjr) initialize the query scheduler according to configuration
- this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances());
- // TODO: register callbacks in QueryStateMachine when the QueryExecution is aborted/finished
+ // We add the abort logic inside the QueryExecution.
+ // So that the other components can only focus on the state change.
+ stateMachine.addStateChangeListener(state -> {
+ if (!state.isDone()) {
+ return;
+ }
+ this.cleanup();
+ });
}
public void start() {
@@ -78,7 +83,9 @@ public class QueryExecution {
}
private void schedule() {
- this.scheduler = new ClusterScheduler(this.stateMachine, this.distributedPlan.getInstances());
+ // TODO: (xingtanzjr) initialize the query scheduler according to configuration
+ this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+ // TODO: (xingtanzjr) how to make the schedule running asynchronously
this.scheduler.start();
}
@@ -95,6 +102,23 @@ public class QueryExecution {
}
/**
+ * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource releasing
+ */
+ private void cleanup() {
+ if (this.scheduler != null) {
+ this.scheduler.abort();
+ }
+ releaseResource();
+ }
+
+ /**
+ * Release the resources that current QueryExecution hold.
+ */
+ private void releaseResource() {
+
+ }
+
+ /**
* This method will be called by the request thread from client connection. This method will block
* until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
* 3. The query has been cancelled 4. The query is timeout This method wil
@@ -113,8 +137,8 @@ public class QueryExecution {
* @return ExecutionStatus. Contains the QueryId and the TSStatus.
*/
public ExecutionStatus getStatus() {
- // Although we monitor the state to transition to FINISHED, the future will return if any Terminated state is triggered
- ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.FINISHED);
+ // Although we monitor the state to transition to RUNNING, the future will return if any Terminated state is triggered
+ ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
try {
QueryState state = future.get();
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 3585009..33b9049 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -20,8 +20,13 @@ package org.apache.iotdb.db.mpp.execution;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
/**
@@ -31,6 +36,7 @@ import java.util.concurrent.Executor;
public class QueryStateMachine {
private String name;
private StateMachine<QueryState> queryState;
+ private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
@@ -38,7 +44,16 @@ public class QueryStateMachine {
public QueryStateMachine(QueryId queryId) {
this.name = String.format("QueryStateMachine[%s]", queryId);
this.stateMachineExecutor = getStateMachineExecutor();
- queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
+ this.fragInstanceStateMap = new ConcurrentHashMap<>();
+ this.queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
+ }
+
+ public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+ this.fragInstanceStateMap.put(id, state);
+ }
+
+ public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+ this.fragInstanceStateMap.put(id, state);
}
// TODO: (xingtanzjr) consider more suitable method for executor initialization
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
new file mode 100644
index 0000000..0cd65f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
+
+ protected QueryStateMachine stateMachine;
+ protected ExecutorService executor;
+ protected List<FragmentInstance> instances;
+
+ public AbstractFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+ this.stateMachine = stateMachine;
+ this.executor = executor;
+ this.instances = instances;
+ }
+
+ public abstract void start();
+
+ protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
+ InternalService.Client client = InternalServiceClientFactory
+ .getInternalServiceClient(instance.getHostEndpoint().ip, instance.getHostEndpoint().port);
+ TFragmentInstanceStateResp resp = client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+ return FragmentInstanceState.valueOf(resp.state);
+ }
+
+ private TFragmentInstanceId getTId(FragmentInstance instance) {
+ return new TFragmentInstanceId(instance.getId().getQueryId().getId(),
+ String.valueOf(instance.getId().getFragmentId().getId()), instance.getId().getInstanceId());
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 820da6e..c4050a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,19 +18,18 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
+import io.airlift.units.Duration;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
-import org.apache.iotdb.db.mpp.execution.QueryState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import io.airlift.units.Duration;
-
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
/**
* QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
@@ -42,14 +41,15 @@ import java.util.concurrent.FutureTask;
public class ClusterScheduler implements IScheduler {
// The stateMachine of the QueryExecution owned by this QueryScheduler
private QueryStateMachine stateMachine;
-
+ private QueryType queryType;
// The fragment instances which should be sent to corresponding Nodes.
private List<FragmentInstance> instances;
private IFragInstanceDispatcher dispatcher;
- public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
+ public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
this.stateMachine = stateMachine;
this.instances = instances;
+ this.queryType = queryType;
this.dispatcher = new SimpleFragInstanceDispatcher();
}
@@ -58,21 +58,50 @@ public class ClusterScheduler implements IScheduler {
// TODO: consider where the state transition should be put
stateMachine.transitionToDispatching();
Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
- //TODO: start the FragmentInstance state fetcher
+
+ // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
+ // So we need to start the state fetcher after the dispatching stage.
+ boolean success = waitDispatchingFinished(dispatchResultFuture);
+ // If the dispatch failed, we make the QueryState as failed, and return.
+ if (!success) {
+ stateMachine.transitionToFailed();
+ return;
+ }
+
+ // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+ if (queryType == QueryType.WRITE) {
+ stateMachine.transitionToFinished();
+ return;
+ }
+
+ // The FragmentInstances has been dispatched successfully to corresponding host, we mark the QueryState to Running
+ stateMachine.transitionToRunning();
+ instances.forEach(instance -> {
+ stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
+ });
+
+ // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
+
+ }
+
+ private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
try {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (result.isSuccessful()) {
- stateMachine.transitionToRunning();
- } else {
- stateMachine.transitionToFailed();
+ return true;
}
} catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
+ // TODO: (xingtanzjr) record the dispatch failure reason.
}
+ return false;
}
@Override
- public void abort() {}
+ public void abort() {
+ if (this.dispatcher != null) {
+ dispatcher.abort();
+ }
+ }
@Override
public Duration getTotalCpuTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
new file mode 100644
index 0000000..583f250
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
+ private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
+ private volatile boolean aborted = false;
+
+ public FixedRateFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+ super(stateMachine, executor, instances);
+ }
+
+ @Override
+ public void start() {
+ while (!aborted) {
+ try {
+ Future<Boolean> future = executor.submit(() -> {
+ for (FragmentInstance instance : instances) {
+ try {
+ FragmentInstanceState state = fetchState(instance);
+ stateMachine.updateFragInstanceState(instance.getId(), state);
+ } catch (TException e) {
+ // TODO: do nothing ?
+ return false;
+ }
+ }
+ return true;
+ });
+ future.get();
+ Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO:
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
index ce3b2d6..9fdedab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
@@ -31,4 +31,6 @@ public interface IFragInstanceDispatcher {
* @return Boolean.
*/
Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+
+ void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
similarity index 68%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
index ce3b2d6..33d0d53 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
@@ -19,16 +19,6 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-
-import java.util.List;
-import java.util.concurrent.Future;
-
-public interface IFragInstanceDispatcher {
- /**
- * Dispatch all Fragment instances asynchronously
- * @param instances Fragment instance list
- * @return Boolean.
- */
- Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+public interface IFragInstanceStateFetcher {
+ void start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index ffe4758..f544c8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -47,4 +47,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
new Thread(dispatchTask).start();
return dispatchTask;
}
+
+ @Override
+ public void abort() {
+
+ }
}