You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/20 02:28:40 UTC

[GitHub] [iotdb] liuminghui233 opened a new pull request, #7665: [IOTDB-4660] Implement IntoOperator

liuminghui233 opened a new pull request, #7665:
URL: https://github.com/apache/iotdb/pull/7665

   Related Documents:
   - [User guide](https://apache-iotdb.feishu.cn/docx/doxcnqLcGFbsTBgUXG3eF8hFZxf)
   - [Design doc](https://apache-iotdb.feishu.cn/docx/doxcnEPF2Sk9dyFzHdHPYIDehvc)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] liuminghui233 commented on a diff in pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
liuminghui233 commented on code in PR #7665:
URL: https://github.com/apache/iotdb/pull/7665#discussion_r1005142400


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class DeviceViewIntoOperator extends AbstractIntoOperator {
+
+  private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
+      deviceToTargetPathSourceInputLocationMap;
+  private final Map<String, Map<PartialPath, Map<String, TSDataType>>>
+      deviceToTargetPathDataTypeMap;
+  private final Map<String, Boolean> targetDeviceToAlignedMap;
+  private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
+
+  private String currentDevice;
+
+  private final TsBlockBuilder resultTsBlockBuilder;
+
+  public DeviceViewIntoOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      Map<String, Map<PartialPath, Map<String, InputLocation>>>
+          deviceToTargetPathSourceInputLocationMap,
+      Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
+      Map<String, Boolean> targetDeviceToAlignedMap,
+      Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+    this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
+    this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
+    this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+    this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
+
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+  }
+
+  @Override
+  public TsBlock next() throws IntoProcessException {
+    TsBlock inputTsBlock = child.next();
+    if (inputTsBlock != null) {
+      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+      if (!Objects.equals(device, currentDevice)) {
+        insertMultiTabletsInternally(false);
+        updateResultTsBlock();
+
+        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+        currentDevice = device;
+      }
+      int readIndex = 0;
+      while (readIndex < inputTsBlock.getPositionCount()) {
+        int lastReadIndex = readIndex;
+        for (IntoOperator.InsertTabletStatementGenerator generator :
+            insertTabletStatementGenerators) {
+          lastReadIndex =
+              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+        }
+        readIndex = lastReadIndex;
+        insertMultiTabletsInternally(true);
+      }
+    }
+
+    if (child.hasNext()) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] liuminghui233 commented on a diff in pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
liuminghui233 commented on code in PR #7665:
URL: https://github.com/apache/iotdb/pull/7665#discussion_r1005142122


##########
server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalClient.class);
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+
+  public DataNodeInternalClient(SessionInfo sessionInfo) {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+
+    try {
+      sessionId =
+          SESSION_MANAGER.requestSessionId(
+              sessionInfo.getUserName(),
+              sessionInfo.getZoneId(),
+              IoTDBConstant.ClientVersion.V_0_13);
+
+      LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), sessionId);
+    } catch (Exception e) {
+      LOGGER.info("User {} opens internal Session failed.", sessionInfo.getUserName());
+      throw new IntoProcessException(
+          String.format("User %s opens internal Session failed.", sessionInfo.getUserName()));
+    }
+  }
+
+  public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
+    try {
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+  }
+
+  public void close() {
+    SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+    SESSION_MANAGER.closeSession(sessionId);
+  }
+
+  private void cleanupQueryExecution(Long queryId) {
+    IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+    if (queryExecution != null) {
+      try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
+        LOGGER.info("[CleanUpQuery]]");

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] liuminghui233 commented on pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
liuminghui233 commented on PR #7665:
URL: https://github.com/apache/iotdb/pull/7665#issuecomment-1288350656

   > Remember to change the IT @before to @BeforeClass
   
   fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] liuminghui233 commented on a diff in pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
liuminghui233 commented on code in PR #7665:
URL: https://github.com/apache/iotdb/pull/7665#discussion_r1005141713


##########
server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalClient.class);
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+
+  public DataNodeInternalClient(SessionInfo sessionInfo) {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+
+    try {
+      sessionId =
+          SESSION_MANAGER.requestSessionId(
+              sessionInfo.getUserName(),
+              sessionInfo.getZoneId(),
+              IoTDBConstant.ClientVersion.V_0_13);
+
+      LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), sessionId);
+    } catch (Exception e) {
+      LOGGER.info("User {} opens internal Session failed.", sessionInfo.getUserName());

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] yifuzhou commented on a diff in pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
yifuzhou commented on code in PR #7665:
URL: https://github.com/apache/iotdb/pull/7665#discussion_r1003946031


##########
server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalClient.class);
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+
+  public DataNodeInternalClient(SessionInfo sessionInfo) {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+
+    try {
+      sessionId =
+          SESSION_MANAGER.requestSessionId(
+              sessionInfo.getUserName(),
+              sessionInfo.getZoneId(),
+              IoTDBConstant.ClientVersion.V_0_13);
+
+      LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), sessionId);
+    } catch (Exception e) {
+      LOGGER.info("User {} opens internal Session failed.", sessionInfo.getUserName());
+      throw new IntoProcessException(
+          String.format("User %s opens internal Session failed.", sessionInfo.getUserName()));
+    }
+  }
+
+  public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
+    try {
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+  }
+
+  public void close() {
+    SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
+    SESSION_MANAGER.closeSession(sessionId);
+  }
+
+  private void cleanupQueryExecution(Long queryId) {
+    IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+    if (queryExecution != null) {
+      try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
+        LOGGER.info("[CleanUpQuery]]");

Review Comment:
   format looks weird.



##########
server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalClient.class);
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+
+  public DataNodeInternalClient(SessionInfo sessionInfo) {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+
+    try {
+      sessionId =
+          SESSION_MANAGER.requestSessionId(
+              sessionInfo.getUserName(),
+              sessionInfo.getZoneId(),
+              IoTDBConstant.ClientVersion.V_0_13);
+
+      LOGGER.info("User: {}, opens internal Session-{}.", sessionInfo.getUserName(), sessionId);
+    } catch (Exception e) {
+      LOGGER.info("User {} opens internal Session failed.", sessionInfo.getUserName());

Review Comment:
   is the exception acceptable? I think the log level should be error or warn, as well as adding e to the log.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class DeviceViewIntoOperator extends AbstractIntoOperator {
+
+  private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
+      deviceToTargetPathSourceInputLocationMap;
+  private final Map<String, Map<PartialPath, Map<String, TSDataType>>>
+      deviceToTargetPathDataTypeMap;
+  private final Map<String, Boolean> targetDeviceToAlignedMap;
+  private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
+
+  private String currentDevice;
+
+  private final TsBlockBuilder resultTsBlockBuilder;
+
+  public DeviceViewIntoOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      Map<String, Map<PartialPath, Map<String, InputLocation>>>
+          deviceToTargetPathSourceInputLocationMap,
+      Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
+      Map<String, Boolean> targetDeviceToAlignedMap,
+      Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+    this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
+    this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
+    this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+    this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
+
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+  }
+
+  @Override
+  public TsBlock next() throws IntoProcessException {
+    TsBlock inputTsBlock = child.next();
+    if (inputTsBlock != null) {
+      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+      if (!Objects.equals(device, currentDevice)) {
+        insertMultiTabletsInternally(false);
+        updateResultTsBlock();
+
+        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+        currentDevice = device;
+      }
+      int readIndex = 0;
+      while (readIndex < inputTsBlock.getPositionCount()) {
+        int lastReadIndex = readIndex;
+        for (IntoOperator.InsertTabletStatementGenerator generator :
+            insertTabletStatementGenerators) {
+          lastReadIndex =
+              Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex));
+        }
+        readIndex = lastReadIndex;
+        insertMultiTabletsInternally(true);
+      }
+    }
+
+    if (child.hasNext()) {

Review Comment:
   It is hard to understand here, in the end of the next() method we check child.hasNext()? I think check hasNext() should be called outside and when hasNext()=true, then do next().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] JackieTien97 commented on a diff in pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on code in PR #7665:
URL: https://github.com/apache/iotdb/pull/7665#discussion_r1002829432


##########
server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.exception.IntoProcessException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.query.control.SessionTimeoutManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+
+public class DataNodeInternalClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalClient.class);
+
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  private final IPartitionFetcher PARTITION_FETCHER;
+
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  private final long sessionId;
+  private boolean isNewSession = false;
+
+  public DataNodeInternalClient(SessionInfo sessionInfo) {
+    if (config.isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+    if (!SESSION_MANAGER.checkLogin(sessionInfo.getSessionId())
+        || !Objects.equals(
+            SESSION_MANAGER.getUsername(sessionInfo.getSessionId()), sessionInfo.getUserName())) {
+      try {
+        this.sessionId =
+            SESSION_MANAGER.requestSessionId(
+                sessionInfo.getUserName(),
+                sessionInfo.getZoneId(),
+                IoTDBConstant.ClientVersion.V_0_13);
+        SessionTimeoutManager.getInstance().register(sessionId);
+        this.isNewSession = true;
+
+        LOGGER.info(
+            "User: {}, opens internal Session-{} in SELECT INTO",
+            sessionInfo.getUserName(),
+            sessionId);
+      } catch (Exception e) {
+        LOGGER.info(
+            "User {} opens internal Session failed in SELECT INTO", sessionInfo.getUserName());
+        throw new IntoProcessException(
+            String.format(
+                "User %s opens internal Session failed in SELECT INTO", sessionInfo.getUserName()));
+      }
+    } else {
+      this.sessionId = sessionInfo.getSessionId();
+    }
+  }
+
+  public TSStatus insertTablets(InsertMultiTabletsStatement statement) {
+    try {
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+  }
+
+  public void close() {
+    if (isNewSession) {

Review Comment:
   Whether or  not this is a new session,  you should close the current queryId. You can refer to `ClientRPCServiceImpl`:
   ```
     @Override
     public TSStatus closeOperation(TSCloseOperationReq req) {
       return SESSION_MANAGER.closeOperation(
           req.sessionId,
           req.queryId,
           req.statementId,
           req.isSetStatementId(),
           req.isSetQueryId(),
           this::cleanupQueryExecution);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] JackieTien97 merged pull request #7665: [IOTDB-4660] Implement IntoOperator & ITs

Posted by GitBox <gi...@apache.org>.
JackieTien97 merged PR #7665:
URL: https://github.com/apache/iotdb/pull/7665


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org