You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/21 12:32:37 UTC

[iotdb] branch IOTDB-4619 updated (c8f61e87a0 -> 186452c497)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from c8f61e87a0 Add IT for CQ Management and Execute & change into syntax
     new 02b760d0ac add CQState in import-check
     new 186452c497 fix some bugs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/confignode/manager/cq/CQManager.java     |   8 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  27 ++--
 integration-test/import-control.xml                |   1 +
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |  13 +-
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |   8 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       | 165 ++++++++++-----------
 6 files changed, 110 insertions(+), 112 deletions(-)


[iotdb] 02/02: fix some bugs

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 186452c4973c7649e7075df501851ae6c58ead0e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 21 20:32:28 2022 +0800

    fix some bugs
---
 .../iotdb/confignode/manager/cq/CQManager.java     |  8 +++++--
 .../iotdb/confignode/manager/node/NodeManager.java | 27 ++++++++--------------
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   | 13 ++++++++++-
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |  8 +++----
 4 files changed, 32 insertions(+), 24 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index b0995f4496..90e8a32a58 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -117,7 +117,9 @@ public class CQManager {
     try {
       // 1. shutdown previous cq schedule thread pool
       try {
-        executor.shutdown();
+        if (executor != null) {
+          executor.shutdown();
+        }
       } catch (Throwable t) {
         // just print the error log because we should make sure we can start a new cq schedule pool
         // successfully in the next steps
@@ -166,6 +168,8 @@ public class CQManager {
     } finally {
       lock.writeLock().unlock();
     }
-    previous.shutdown();
+    if (previous != null) {
+      previous.shutdown();
+    }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 875017c8f2..844971242e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -85,6 +85,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
@@ -133,12 +134,15 @@ public class NodeManager {
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Unknown-DataNode-Detector");
   private final Set<TDataNodeLocation> oldUnknownNodes;
 
+  private final Random random;
+
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
     this.nodeCacheMap = new ConcurrentHashMap<>();
     this.oldUnknownNodes = new HashSet<>();
+    this.random = new Random(System.currentTimeMillis());
   }
 
   private void setGlobalConfig(DataNodeRegisterResp dataSet) {
@@ -829,26 +833,15 @@ public class NodeManager {
    * @return TDataNodeLocation
    */
   public Optional<TDataNodeLocation> getLowestLoadDataNode() {
-    AtomicInteger result = new AtomicInteger(-1);
-    AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
-
-    nodeCacheMap.forEach(
-        (dataNodeId, heartbeatCache) -> {
-          long score = heartbeatCache.getLoadScore();
-          if (heartbeatCache.getNodeStatus() == NodeStatus.Running
-              && score < lowestLoadScore.get()) {
-            result.set(dataNodeId);
-            lowestLoadScore.set(score);
-          }
-        });
+    // TODO get real lowest load data node after scoring algorithm being implemented
+    List<TDataNodeConfiguration> targetDataNodeList =
+        filterDataNodeThroughStatus(NodeStatus.Running);
 
-    if (result.get() == -1) {
+    if (targetDataNodeList == null || targetDataNodeList.isEmpty()) {
       return Optional.empty();
     } else {
-      LOGGER.info(
-          "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
-      return Optional.of(
-          configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()));
+      int index = random.nextInt(targetDataNodeList.size());
+      return Optional.of(targetDataNodeList.get(index).location);
     }
   }
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 95cf12bf35..438989e432 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -18,7 +18,11 @@
  */
 package org.apache.iotdb.it.env;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.jdbc.Constant;
@@ -148,7 +152,14 @@ public class RemoteServerEnv implements BaseEnv {
 
   @Override
   public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException {
-    return null;
+    IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
+        new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
+            .createClientManager(
+                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+    try (SyncConfigNodeIServiceClient client =
+        clientManager.borrowClient(new TEndPoint(ip_addr, 22277))) {
+      return client;
+    }
   }
 
   @Override
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index 8c38aa1c7e..5898c817f2 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -291,8 +291,8 @@ public class IoTDBConfigNodeSnapshotIT {
   }
 
   private Set<TCQEntry> createCQs(SyncConfigNodeIServiceClient client) throws TException {
-    String sql1 = "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END";
-    String sql2 = "create cq testCq1 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END";
+    String sql1 = "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END";
+    String sql2 = "create cq testCq2 BEGIN select s1 into root.backup.d2(s1) from root.sg.d2 END";
     TCreateCQReq req1 =
         new TCreateCQReq(
             "testCq1",
@@ -301,7 +301,7 @@ public class IoTDBConfigNodeSnapshotIT {
             1000,
             0,
             (byte) 0,
-            "select s1 into root.backup.d1.s1 from root.sg.d1",
+            "select s1 into root.backup.d1(s1) from root.sg.d1",
             sql1,
             "Asia");
     TCreateCQReq req2 =
@@ -312,7 +312,7 @@ public class IoTDBConfigNodeSnapshotIT {
             1000,
             0,
             (byte) 1,
-            "select s1 into root.backup.d2.s1 from root.sg.d2",
+            "select s1 into root.backup.d2(s1) from root.sg.d2",
             sql2,
             "Asia");
 


[iotdb] 01/02: add CQState in import-check

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 02b760d0ac429b76def0b609bcca48d995047786
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Oct 21 18:09:56 2022 +0800

    add CQState in import-check
---
 integration-test/import-control.xml                |   1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       | 165 ++++++++++-----------
 2 files changed, 78 insertions(+), 88 deletions(-)

diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index c5c350e7a4..bd29bc42e2 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -57,6 +57,7 @@
     <allow class="org.apache.commons.codec.digest.DigestUtils" />
     <allow class="org.apache.iotdb.commons.trigger.service.TriggerExecutableManager" />
     <allow class="org.apache.iotdb.commons.trigger.TriggerInformation" />
+    <allow class="org.apache.iotdb.commons.cq.CQState" />
     <allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true" />
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d014d1052a..777ee4e21d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -51,7 +50,6 @@ import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.OperationType;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -79,14 +77,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
-import org.apache.iotdb.db.mpp.plan.expression.ternary.BetweenExpression;
-import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -97,8 +88,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
-import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
-import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
@@ -167,8 +156,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 import org.apache.iotdb.trigger.api.enums.TriggerEvent;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import com.google.common.collect.ImmutableList;
@@ -185,13 +172,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 
 public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
 
@@ -600,79 +584,84 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TSStatus executeCQ(TExecuteCQ req) throws TException {
 
-    long sessionId =
-        SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId, IoTDBConstant.ClientVersion.V_0_13);
-
-    try {
-      QueryStatement s =
-          (QueryStatement)
-              StatementGenerator.createStatement(
-                  req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
-      if (s == null) {
-        return RpcUtils.getStatus(
-            TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
-      }
-
-      // 1. add time filter in where
-      Expression timeFilter =
-          new BetweenExpression(
-              new TimestampOperand(),
-              new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
-              new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
-      if (s.getWhereCondition() != null) {
-        s.getWhereCondition()
-            .setPredicate(new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
-      } else {
-        s.setWhereCondition(new WhereCondition(timeFilter));
-      }
-
-      // 2. add time rage in group by time
-      if (s.getGroupByTimeComponent() != null) {
-        s.getGroupByTimeComponent().setStartTime(req.startTime);
-        s.getGroupByTimeComponent().setEndTime(req.endTime);
-      }
-
-      QUERY_FREQUENCY_RECORDER.incrementAndGet();
-
-      long queryId =
-          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.requestStatementId(sessionId), true);
-      // create and cache dataset
-      ExecutionResult result =
-          COORDINATOR.execute(
-              s,
-              queryId,
-              SESSION_MANAGER.getSessionInfo(sessionId),
-              req.queryBody,
-              PARTITION_FETCHER,
-              SCHEMA_FETCHER,
-              req.getTimeout());
-
-      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
-        return result.status;
-      }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
 
-      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-
-      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
-        if (queryExecution != null) {
-          // consume up all the result
-          while (true) {
-            Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
-            if (!optionalTsBlock.isPresent()) {
-              break;
-            }
-          }
-        }
-        return result.status;
-      }
-    } catch (Exception e) {
-      // TODO call the coordinator to release query resource
-      return onQueryException(e, "\"" + req.queryBody + "\". " + OperationType.EXECUTE_STATEMENT);
-    } finally {
-      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
-      SESSION_MANAGER.closeSession(sessionId);
-    }
+    //    long sessionId =
+    //        SESSION_MANAGER.requestSessionId(req.cqId, req.zoneId,
+    // IoTDBConstant.ClientVersion.V_0_13);
+    //
+    //    try {
+    //      QueryStatement s =
+    //          (QueryStatement)
+    //              StatementGenerator.createStatement(
+    //                  req.queryBody, SESSION_MANAGER.getZoneId(sessionId));
+    //      if (s == null) {
+    //        return RpcUtils.getStatus(
+    //            TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
+    //      }
+    //
+    //      // 1. add time filter in where
+    //      Expression timeFilter =
+    //          new BetweenExpression(
+    //              new TimestampOperand(),
+    //              new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime)),
+    //              new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime)));
+    //      if (s.getWhereCondition() != null) {
+    //        s.getWhereCondition()
+    //            .setPredicate(new LogicAndExpression(timeFilter,
+    // s.getWhereCondition().getPredicate()));
+    //      } else {
+    //        s.setWhereCondition(new WhereCondition(timeFilter));
+    //      }
+    //
+    //      // 2. add time rage in group by time
+    //      if (s.getGroupByTimeComponent() != null) {
+    //        s.getGroupByTimeComponent().setStartTime(req.startTime);
+    //        s.getGroupByTimeComponent().setEndTime(req.endTime);
+    //      }
+    //
+    //      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+    //
+    //      long queryId =
+    //          SESSION_MANAGER.requestQueryId(SESSION_MANAGER.requestStatementId(sessionId), true);
+    //      // create and cache dataset
+    //      ExecutionResult result =
+    //          COORDINATOR.execute(
+    //              s,
+    //              queryId,
+    //              SESSION_MANAGER.getSessionInfo(sessionId),
+    //              req.queryBody,
+    //              PARTITION_FETCHER,
+    //              SCHEMA_FETCHER,
+    //              req.getTimeout());
+    //
+    //      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+    //          && result.status.code != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+    //        return result.status;
+    //      }
+    //
+    //      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+    //
+    //      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+    //        if (queryExecution != null) {
+    //          // consume up all the result
+    //          while (true) {
+    //            Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+    //            if (!optionalTsBlock.isPresent()) {
+    //              break;
+    //            }
+    //          }
+    //        }
+    //        return result.status;
+    //      }
+    //    } catch (Exception e) {
+    //      // TODO call the coordinator to release query resource
+    //      return onQueryException(e, "\"" + req.queryBody + "\". " +
+    // OperationType.EXECUTE_STATEMENT);
+    //    } finally {
+    //      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+    //      SESSION_MANAGER.closeSession(sessionId);
+    //    }
   }
 
   private void cleanupQueryExecution(Long queryId) {