You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:11 UTC

[iotdb] 10/12: finish execute insertMultiTabletsStatement

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a8d74daf90583ce962593733c2243d7b4ef10ada
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 16:17:42 2022 +0800

    finish execute insertMultiTabletsStatement
---
 .../iotdb/db/client/DataNodeInternalClient.java    | 96 ++++++++++++++++++++++
 .../iotdb/db/exception/IntoProcessException.java   | 27 ++++++
 .../operator/process/AbstractIntoOperator.java     | 20 ++++-
 .../operator/process/DeviceViewIntoOperator.java   |  3 +-
 .../iotdb/db/query/control/SessionManager.java     |  5 ++
 5 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
new file mode 100644
index 0000000000..43f7271bd2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+
+  public DataNodeInternalClient() {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+    sessionId = SESSION_MANAGER.requestInternalSessionId();
+  }
+
+  public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
+    try {
+      if (statement.isEmpty()) {
+        // return success when this statement is empty because server doesn't need to execute it
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      }
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java
new file mode 100644
index 0000000000..a19b049152
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+public class IntoProcessException extends RuntimeException {
+
+  public IntoProcessException(String message) {
+    super(message);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 28b90daae7..21cf6951fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -19,13 +19,17 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.client.DataNodeInternalClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -34,6 +38,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
@@ -51,6 +59,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
+  private final DataNodeInternalClient client = new DataNodeInternalClient();
+
   public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -95,7 +105,15 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
     InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
-    // TODO: execute insertMultiTabletsStatement
+    TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
+    if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      String message =
+          String.format(
+              "Error occurred while inserting tablets in SELECT INTO. %s",
+              executionStatus.getMessage());
+      LOGGER.error(message);
+      throw new IntoProcessException(message);
+    }
 
     for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       generator.reset();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index f0eab0b71a..201aaaef3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -74,7 +75,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws IntoProcessException {
     TsBlock inputTsBlock = child.next();
     if (inputTsBlock != null) {
       String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 53f140d317..1ee80d5df2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -237,6 +237,11 @@ public class SessionManager {
     return sessionId;
   }
 
+  public long requestInternalSessionId() {
+    return requestSessionId(
+        "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13);
+  }
+
   public boolean releaseSessionResource(long sessionId) {
     return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
   }