You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/15 04:28:05 UTC

[iotdb] 01/01: add weak read consistency policy for MPP

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/weak_read
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bd5006aabaa722cb01c36a305b88f975f60440a7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 15 12:27:53 2022 +0800

    add weak read consistency policy for MPP
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  4 +++
 .../apache/iotdb/db/mpp/common/SessionInfo.java    | 25 +++++++++++++++---
 .../SimpleFragmentParallelPlanner.java             | 28 ++++++++++++++++++++
 .../db/mpp/plan/planner/plan/FragmentInstance.java |  6 +++++
 .../iotdb/db/query/control/SessionManager.java     |  1 +
 .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java   |  2 +-
 .../plan/scheduler/StandaloneSchedulerTest.java    | 30 ++++++++++++++++++----
 7 files changed, 86 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index de2cf66aba..153b1e8598 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -76,4 +76,8 @@ public class MPPQueryContext {
   public TEndPoint getLocalInternalEndpoint() {
     return localInternalEndpoint;
   }
+
+  public SessionInfo getSession() {
+    return session;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
index ad6426724b..ff53da75d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
@@ -18,9 +18,26 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
-import java.time.ZoneId;
-
 public class SessionInfo {
-  private String userName;
-  private ZoneId zoneId;
+  private final long sessionId;
+  private final String userName;
+  private final String zoneId;
+
+  public SessionInfo(long sessionId, String userName, String zoneId) {
+    this.sessionId = sessionId;
+    this.userName = userName;
+    this.zoneId = zoneId;
+  }
+
+  public long getSessionId() {
+    return sessionId;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getZoneId() {
+    return zoneId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 50fd5d046a..688d49a702 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -100,11 +102,37 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // redirected
     // to another host when scheduling
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
+
     fragmentInstance.getFragment().setTypeProvider(analysis.getTypeProvider());
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
     fragmentInstanceList.add(fragmentInstance);
   }
 
+  private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
+    if (regionReplicaSet == null
+        || regionReplicaSet.getDataNodeLocations() == null
+        || regionReplicaSet.getDataNodeLocations().size() == 0) {
+      throw new IllegalArgumentException(
+          String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
+    }
+    String readConsistencyLevel =
+        IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
+    // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
+    // enums
+    boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+    int targetIndex;
+    if (!selectRandomDataNode) {
+      targetIndex = 0;
+    } else {
+      targetIndex =
+          (int)
+              (queryContext.getSession().getSessionId()
+                  % regionReplicaSet.getDataNodeLocationsSize());
+    }
+    return regionReplicaSet.getDataNodeLocations().get(targetIndex);
+  }
+
   private void calculateNodeTopologyBetweenInstance() {
     for (FragmentInstance instance : fragmentInstanceList) {
       PlanNode rootNode = instance.getFragment().getRoot();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 47e785a58a..f617ff9f02 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -87,6 +87,12 @@ public class FragmentInstance implements IConsensusRequest {
     }
   }
 
+  // Although the HostDataNode is set in method setDataRegionAndHost(),
+  // we still keep another method for customized needs
+  public void setHostDataNode(TDataNodeLocation hostDataNode) {
+    this.hostDataNode = hostDataNode;
+  }
+
   public TRegionReplicaSet getRegionReplicaSet() {
     return regionReplicaSet;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index ffe0afd7b5..0a0584616d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -229,6 +229,7 @@ public class SessionManager {
     sessionIdToUsername.put(sessionId, username);
     sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
     sessionIdToClientVersion.put(sessionId, clientVersion);
+    sessionIdToSessionInfo.put(sessionId, new SessionInfo(sessionId, username, zoneId));
 
     return sessionId;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index 48b4137bb2..de492a843b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -74,7 +74,7 @@ public class QueryPlannerTest {
             new MPPQueryContext(
                 querySql,
                 new QueryId("query1"),
-                new SessionInfo(),
+                new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
                 new TEndPoint(),
                 new TEndPoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 6a19ade539..7a7d562b1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -140,7 +140,11 @@ public class StandaloneSchedulerTest {
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status"));
     MPPQueryContext context =
         new MPPQueryContext(
-            "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+            "",
+            new QueryId("query1"),
+            new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+            new TEndPoint(),
+            new TEndPoint());
     ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
     QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
@@ -239,7 +243,11 @@ public class StandaloneSchedulerTest {
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS"));
     MPPQueryContext context =
         new MPPQueryContext(
-            "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+            "",
+            new QueryId("query1"),
+            new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+            new TEndPoint(),
+            new TEndPoint());
     ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
     QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
@@ -348,7 +356,11 @@ public class StandaloneSchedulerTest {
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3"));
     MPPQueryContext context =
         new MPPQueryContext(
-            "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+            "",
+            new QueryId("query1"),
+            new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+            new TEndPoint(),
+            new TEndPoint());
     ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
     QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
@@ -396,7 +408,11 @@ public class StandaloneSchedulerTest {
     configNode.getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
     MPPQueryContext context =
         new MPPQueryContext(
-            "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+            "",
+            new QueryId("query1"),
+            new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+            new TEndPoint(),
+            new TEndPoint());
     ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
     QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
@@ -473,7 +489,11 @@ public class StandaloneSchedulerTest {
     configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
     MPPQueryContext context =
         new MPPQueryContext(
-            "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint());
+            "",
+            new QueryId("query1"),
+            new SessionInfo(1L, "fakeUsername", "fakeZoneId"),
+            new TEndPoint(),
+            new TEndPoint());
     ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test");
     QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);