You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:11 UTC
[iotdb] 10/12: finish execute insertMultiTabletsStatement
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a8d74daf90583ce962593733c2243d7b4ef10ada
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 16:17:42 2022 +0800
finish execute insertMultiTabletsStatement
---
.../iotdb/db/client/DataNodeInternalClient.java | 96 ++++++++++++++++++++++
.../iotdb/db/exception/IntoProcessException.java | 27 ++++++
.../operator/process/AbstractIntoOperator.java | 20 ++++-
.../operator/process/DeviceViewIntoOperator.java | 3 +-
.../iotdb/db/query/control/SessionManager.java | 5 ++
5 files changed, 149 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
new file mode 100644
index 0000000000..43f7271bd2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+ private final IPartitionFetcher PARTITION_FETCHER;
+
+ private final ISchemaFetcher SCHEMA_FETCHER;
+
+ private final long sessionId;
+
+ public DataNodeInternalClient() {
+ if (config.isClusterMode()) {
+ PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+ SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+ } else {
+ PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+ SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+ }
+ sessionId = SESSION_MANAGER.requestInternalSessionId();
+ }
+
+ public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
+ try {
+ if (statement.isEmpty()) {
+ // return success when this statement is empty because server doesn't need to execute it
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ // call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java
new file mode 100644
index 0000000000..a19b049152
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+public class IntoProcessException extends RuntimeException {
+
+ public IntoProcessException(String message) {
+ super(message);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 28b90daae7..21cf6951fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -19,13 +19,17 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.client.DataNodeInternalClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.IntoProcessException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -34,6 +38,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -44,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractIntoOperator implements ProcessOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
+
protected final OperatorContext operatorContext;
protected final Operator child;
@@ -51,6 +59,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
+ private final DataNodeInternalClient client = new DataNodeInternalClient();
+
public AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
@@ -95,7 +105,15 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
- // TODO: execute insertMultiTabletsStatement
+ TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement);
+ if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ String message =
+ String.format(
+ "Error occurred while inserting tablets in SELECT INTO. %s",
+ executionStatus.getMessage());
+ LOGGER.error(message);
+ throw new IntoProcessException(message);
+ }
for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
generator.reset();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index f0eab0b71a..201aaaef3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.IntoProcessException;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -74,7 +75,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
}
@Override
- public TsBlock next() {
+ public TsBlock next() throws IntoProcessException {
TsBlock inputTsBlock = child.next();
if (inputTsBlock != null) {
String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 53f140d317..1ee80d5df2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -237,6 +237,11 @@ public class SessionManager {
return sessionId;
}
+ public long requestInternalSessionId() {
+ return requestSessionId(
+ "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13);
+ }
+
public boolean releaseSessionResource(long sessionId) {
return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
}