You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/11 04:50:59 UTC
[iotdb] 01/02: tmp save
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/scheduler
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fbaaa4a273f11e7e1eeaec6cc36dfaff4abca161
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 10 23:13:38 2022 +0800
tmp save
---
.../mpp/FragmentInstanceDispatchException.java} | 22 +--
.../execution/fragment/FragmentInstanceInfo.java | 11 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 11 ++
.../db/mpp/plan/execution/QueryExecution.java | 6 +-
.../db/mpp/plan/scheduler/ClusterScheduler.java | 6 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 191 +++++++++++++++++++++
.../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 1 +
7 files changed, 227 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
copy to server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index 42d915b42b..8f32997d6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -16,25 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.fragment;
-import org.apache.iotdb.consensus.common.DataSet;
+package org.apache.iotdb.db.exception.mpp;
-public class FragmentInstanceInfo implements DataSet {
- private final FragmentInstanceState state;
-
- private final long endTime;
-
- public FragmentInstanceInfo(FragmentInstanceState state, long endTime) {
- this.state = state;
- this.endTime = endTime;
- }
-
- public FragmentInstanceState getState() {
- return state;
- }
-
- public long getEndTime() {
- return endTime;
+public class FragmentInstanceDispatchException extends Exception {
+ public FragmentInstanceDispatchException(Throwable t) {
+ super(t);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
index 42d915b42b..585c2845c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.consensus.common.DataSet;
public class FragmentInstanceInfo implements DataSet {
private final FragmentInstanceState state;
-
+ private String message;
private final long endTime;
public FragmentInstanceInfo(FragmentInstanceState state, long endTime) {
@@ -30,6 +30,11 @@ public class FragmentInstanceInfo implements DataSet {
this.endTime = endTime;
}
+ public FragmentInstanceInfo(FragmentInstanceState state, long endTime, String message) {
+ this(state, endTime);
+ this.message = message;
+ }
+
public FragmentInstanceState getState() {
return state;
}
@@ -37,4 +42,8 @@ public class FragmentInstanceInfo implements DataSet {
public long getEndTime() {
return endTime;
}
+
+ public String getMessage() {
+ return message;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 23c417862c..c6f6c4a2b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -53,6 +53,8 @@ public class Coordinator {
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+ private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
+ private static final int COORDINATOR_WRITE_EXECUTOR_SIZE = 10;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
@@ -73,6 +75,7 @@ public class Coordinator {
new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private static final Coordinator INSTANCE = new Coordinator();
@@ -82,6 +85,7 @@ public class Coordinator {
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
this.executor = getQueryExecutor();
+ this.writeOperationExecutor = getWriteExecutor();
this.scheduledExecutor = getScheduledExecutor();
}
@@ -98,6 +102,7 @@ public class Coordinator {
statement,
queryContext,
executor,
+ writeOperationExecutor,
scheduledExecutor,
partitionFetcher,
schemaFetcher,
@@ -138,6 +143,12 @@ public class Coordinator {
return IoTDBThreadPoolFactory.newFixedThreadPool(
COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME);
}
+
+ private ExecutorService getWriteExecutor() {
+ return IoTDBThreadPoolFactory.newFixedThreadPool(
+ COORDINATOR_WRITE_EXECUTOR_SIZE, COORDINATOR_WRITE_EXECUTOR_NAME);
+ }
+
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 9f4b2b9d9d..0e5da23ad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -86,10 +86,11 @@ public class QueryExecution implements IQueryExecution {
private DistributedQueryPlan distributedPlan;
private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
// TODO need to use factory to decide standalone or cluster
private final IPartitionFetcher partitionFetcher;
- // TODO need to use factory to decide standalone or cluster
+ // TODO need to use factory to decide standalone or cluster,
private final ISchemaFetcher schemaFetcher;
// The result of QueryExecution will be written to the DataBlockManager in current Node.
@@ -103,11 +104,13 @@ public class QueryExecution implements IQueryExecution {
Statement statement,
MPPQueryContext context,
ExecutorService executor,
+ ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.executor = executor;
+ this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
@@ -176,6 +179,7 @@ public class QueryExecution implements IQueryExecution {
distributedPlan.getInstances(),
context.getQueryType(),
executor,
+ writeOperationExecutor,
scheduledExecutor,
internalServiceClientManager)
: new StandaloneScheduler(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index ced23a70ac..dd2ca46271 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -58,6 +58,7 @@ public class ClusterScheduler implements IScheduler {
private List<FragmentInstance> instances;
private ExecutorService executor;
+ private ExecutorService writeOperationExecutor;
private ScheduledExecutorService scheduledExecutor;
private IFragInstanceDispatcher dispatcher;
@@ -70,6 +71,7 @@ public class ClusterScheduler implements IScheduler {
List<FragmentInstance> instances,
QueryType queryType,
ExecutorService executor,
+ ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.queryContext = queryContext;
@@ -78,7 +80,9 @@ public class ClusterScheduler implements IScheduler {
this.queryType = queryType;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
- this.dispatcher = new SimpleFragInstanceDispatcher(executor, internalServiceClientManager);
+ this.dispatcher =
+ new FragmentInstanceDispatcherImpl(
+ queryType, executor, writeOperationExecutor, internalServiceClientManager);
this.stateTracker =
new FixedRateFragInsStateTracker(
stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
new file mode 100644
index 0000000000..8922de59e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
+ private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
+ private final QueryType type;
+ private final String localhostIpAddr;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+
+ public FragmentInstanceDispatcherImpl(
+ QueryType type,
+ ExecutorService executor,
+ ExecutorService writeOperationExecutor,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ this.type = type;
+ this.executor = executor;
+ this.writeOperationExecutor = writeOperationExecutor;
+ this.internalServiceClientManager = internalServiceClientManager;
+ this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalIp();
+ }
+
+ @Override
+ public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
+ if (type == QueryType.READ) {
+ return dispatchRead(instances);
+ } else {
+ return dispatchWrite(instances);
+ }
+ }
+
+ // TODO: (xingtanzjr) currently we use a sequential dispatch policy for READ, which is
+ // unsafe for current FragmentInstance scheduler framework. We need to implement the
+ // topological dispatch according to dependency relations between FragmentInstances
+ private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) {
+ return executor.submit(
+ () -> {
+ for (FragmentInstance instance : instances) {
+ boolean accepted = dispatchOneInstance(instance);
+ if (!accepted) {
+ return new FragInstanceDispatchResult(false);
+ }
+ }
+ return new FragInstanceDispatchResult(true);
+ });
+ }
+
+ // TODO: (xingtanzjr) Return the detailed write states for each FragmentInstance
+ private Future<FragInstanceDispatchResult> dispatchWrite(List<FragmentInstance> instances) {
+ List<Future<Boolean>> futures = new LinkedList<>();
+ for (FragmentInstance instance : instances) {
+ futures.add(writeOperationExecutor.submit(() -> dispatchOneInstance(instance)));
+ }
+ SettableFuture<FragInstanceDispatchResult> resultFuture = SettableFuture.create();
+ for (Future<Boolean> future : futures) {
+ try {
+ Boolean success = future.get();
+ if (!success) {
+ resultFuture.set(new FragInstanceDispatchResult(false));
+ break;
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ resultFuture.setException(e);
+ break;
+ }
+ }
+ resultFuture.set(new FragInstanceDispatchResult(true));
+ return resultFuture;
+ }
+
+ private boolean dispatchOneInstance(FragmentInstance instance)
+ throws FragmentInstanceDispatchException {
+ TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+ if (isDispatchedToLocal(endPoint)) {
+ return dispatchLocally(instance);
+ } else {
+ return dispatchRemote(instance, endPoint);
+ }
+ }
+
+ private boolean isDispatchedToLocal(TEndPoint endPoint) {
+ return this.localhostIpAddr.equals(endPoint.getIp());
+ }
+
+ private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+ throws FragmentInstanceDispatchException {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ TSendFragmentInstanceResp resp = client.sendFragmentInstance(req);
+ return resp.accepted;
+ } catch (IOException | TException e) {
+ logger.error("can't connect to node {}", endPoint, e);
+ throw new FragmentInstanceDispatchException(e);
+ }
+ }
+
+ private boolean dispatchLocally(FragmentInstance instance)
+ throws FragmentInstanceDispatchException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
+ instance.getRegionReplicaSet().getRegionId());
+ switch (instance.getType()) {
+ case READ:
+ FragmentInstanceInfo info =
+ (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId, instance).getDataset();
+ return !info.getState().isFailed();
+ case WRITE:
+ PlanNode planNode = instance.getFragment().getRoot();
+ if (planNode instanceof InsertNode) {
+ try {
+ SchemaValidator.validate((InsertNode) planNode);
+ } catch (SemanticException e) {
+ throw new FragmentInstanceDispatchException(e);
+ }
+ }
+ ConsensusWriteResponse resp = ConsensusImpl.getInstance().write(groupId, instance);
+ return TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode();
+ }
+ throw new UnsupportedOperationException(
+ String.format("unknown query type [%s]", instance.getType()));
+ }
+
+ @Override
+ public void abort() {}
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index c6694843a0..48b4137bb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -78,6 +78,7 @@ public class QueryPlannerTest {
new TEndPoint(),
new TEndPoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("test_write_operation"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
new FakePartitionFetcherImpl(),
new FakeSchemaFetcherImpl(),