You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/07/15 12:12:17 UTC
[iotdb] branch master updated: Add weak read consistency policy for MPP (#6681)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ba8bfad35c Add weak read consistency policy for MPP (#6681)
ba8bfad35c is described below
commit ba8bfad35cd1eb00eabf91349fad0d8e8e87869b
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Fri Jul 15 20:12:11 2022 +0800
Add weak read consistency policy for MPP (#6681)
add weak read consistency policy for MPP, add npe check for session
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 4 +++
.../apache/iotdb/db/mpp/common/SessionInfo.java | 25 +++++++++++++++---
.../SimpleFragmentParallelPlanner.java | 28 ++++++++++++++++++++
.../db/mpp/plan/planner/plan/FragmentInstance.java | 6 +++++
.../iotdb/db/query/control/SessionManager.java | 1 +
.../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 2 +-
.../plan/scheduler/StandaloneSchedulerTest.java | 30 ++++++++++++++++++----
7 files changed, 86 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index de2cf66aba..153b1e8598 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -76,4 +76,8 @@ public class MPPQueryContext {
public TEndPoint getLocalInternalEndpoint() {
return localInternalEndpoint;
}
+
+ public SessionInfo getSession() {
+ return session;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
index ad6426724b..ff53da75d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
@@ -18,9 +18,26 @@
*/
package org.apache.iotdb.db.mpp.common;
-import java.time.ZoneId;
-
public class SessionInfo {
- private String userName;
- private ZoneId zoneId;
+ private final long sessionId;
+ private final String userName;
+ private final String zoneId;
+
+ public SessionInfo(long sessionId, String userName, String zoneId) {
+ this.sessionId = sessionId;
+ this.userName = userName;
+ this.zoneId = zoneId;
+ }
+
+ public long getSessionId() {
+ return sessionId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getZoneId() {
+ return zoneId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 50fd5d046a..80b92f4f45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.db.mpp.plan.planner.distribution;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -100,11 +102,37 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// redirected
// to another host when scheduling
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+ fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
+
fragmentInstance.getFragment().setTypeProvider(analysis.getTypeProvider());
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
}
+ private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
+ if (regionReplicaSet == null
+ || regionReplicaSet.getDataNodeLocations() == null
+ || regionReplicaSet.getDataNodeLocations().size() == 0) {
+ throw new IllegalArgumentException(
+ String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
+ }
+ String readConsistencyLevel =
+ IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
+ // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
+ // enums
+ boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+ int targetIndex;
+ if (!selectRandomDataNode || queryContext.getSession() == null) {
+ targetIndex = 0;
+ } else {
+ targetIndex =
+ (int)
+ (queryContext.getSession().getSessionId()
+ % regionReplicaSet.getDataNodeLocationsSize());
+ }
+ return regionReplicaSet.getDataNodeLocations().get(targetIndex);
+ }
+
private void calculateNodeTopologyBetweenInstance() {
for (FragmentInstance instance : fragmentInstanceList) {
PlanNode rootNode = instance.getFragment().getRoot();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 47e785a58a..f617ff9f02 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -87,6 +87,12 @@ public class FragmentInstance implements IConsensusRequest {
}
}
+ // Although the HostDataNode is set in method setDataRegionAndHost(),
+ // we still keep another method for customized needs
+ public void setHostDataNode(TDataNodeLocation hostDataNode) {
+ this.hostDataNode = hostDataNode;
+ }
+
public TRegionReplicaSet getRegionReplicaSet() {
return regionReplicaSet;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index ffe0afd7b5..0a0584616d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -229,6 +229,7 @@ public class SessionManager {
sessionIdToUsername.put(sessionId, username);
sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
sessionIdToClientVersion.put(sessionId, clientVersion);
+ sessionIdToSessionInfo.put(sessionId, new SessionInfo(sessionId, username, zoneId));
return sessionId;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index 48b4137bb2..de492a843b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -74,7 +74,7 @@ public class QueryPlannerTest {
new MPPQueryContext(
querySql,
new QueryId("query1"),
- new SessionInfo(),
+ new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
new TEndPoint(),
new TEndPoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 6a19ade539..7a7d562b1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -140,7 +140,11 @@ public class StandaloneSchedulerTest {
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status"));
MPPQueryContext context =
new MPPQueryContext(
- "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+ "",
+ new QueryId("query1"),
+ new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+ new TEndPoint(),
+ new TEndPoint());
ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
@@ -239,7 +243,11 @@ public class StandaloneSchedulerTest {
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS"));
MPPQueryContext context =
new MPPQueryContext(
- "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+ "",
+ new QueryId("query1"),
+ new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+ new TEndPoint(),
+ new TEndPoint());
ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
@@ -348,7 +356,11 @@ public class StandaloneSchedulerTest {
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3"));
MPPQueryContext context =
new MPPQueryContext(
- "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+ "",
+ new QueryId("query1"),
+ new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+ new TEndPoint(),
+ new TEndPoint());
ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
@@ -396,7 +408,11 @@ public class StandaloneSchedulerTest {
configNode.getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
MPPQueryContext context =
new MPPQueryContext(
- "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+ "",
+ new QueryId("query1"),
+ new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+ new TEndPoint(),
+ new TEndPoint());
ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
@@ -473,7 +489,11 @@ public class StandaloneSchedulerTest {
configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
MPPQueryContext context =
new MPPQueryContext(
- "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+ "",
+ new QueryId("query1"),
+ new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+ new TEndPoint(),
+ new TEndPoint());
ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);