You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/01/04 07:45:24 UTC
[iotdb] branch xkf_tpc_test updated: improve
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch xkf_tpc_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xkf_tpc_test by this push:
new 9f1ab45 improve
9f1ab45 is described below
commit 9f1ab45b7c6f37f9cdf073fb9de6ba17dcce08a7
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Jan 4 15:44:55 2021 +0800
improve
---
.../apache/iotdb/db/service/AsyncInsertPool.java | 22 +++++++++++++++++++-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 24 ++--------------------
2 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java b/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java
index 4fb3924..a245f52 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.service;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.serviceSession.pool.SessionPool;
@@ -45,8 +48,25 @@ public class AsyncInsertPool {
pool.submit(new Runnable() {
@Override
public void run() {
+ TSInsertTabletsReq transferReq = new TSInsertTabletsReq();
+ transferReq.deviceIds = req.deviceIds;
+ transferReq.setIsFinal(true);
+ transferReq.measurementsList = req.measurementsList;
+ transferReq.typesList = req.typesList;
+ transferReq.sizeList = new ArrayList<>(req.sizeList);
+ List<ByteBuffer> valueBuffer = new ArrayList<>(req.valuesList.size());
+ for(ByteBuffer byteBuffer : req.valuesList){
+ valueBuffer.add(ByteBuffer.wrap(byteBuffer.array()));
+ }
+ List<ByteBuffer> timeBuffer = new ArrayList<>(req.timestampsList.size());
+ for(ByteBuffer byteBuffer : req.timestampsList){
+ timeBuffer.add(ByteBuffer.wrap(byteBuffer.array()));
+ }
+ transferReq.valuesList = valueBuffer;
+ transferReq.timestampsList = timeBuffer;
+
try {
- sessionPool.insertTablets(req);
+ sessionPool.insertTablets(transferReq);
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error("transfer request failed", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index d0d23b7..a6de0f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -76,9 +76,9 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -139,7 +139,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
@@ -1500,26 +1499,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
public TSStatus insertTablets(TSInsertTabletsReq req) {
// transfer to another
if(!req.isFinal){
- TSInsertTabletsReq transferReq = new TSInsertTabletsReq();
- transferReq.deviceIds = req.deviceIds;
- transferReq.setIsFinal(true);
- transferReq.measurementsList = req.measurementsList;
- transferReq.typesList = req.typesList;
- transferReq.sizeList = new ArrayList<>(req.sizeList);
- List<ByteBuffer> valueBuffer = new ArrayList<>(req.valuesList.size());
- for(ByteBuffer byteBuffer : req.valuesList){
- valueBuffer.add(ReadWriteIOUtils.clone(byteBuffer));
- byteBuffer.flip();
- }
- List<ByteBuffer> timeBuffer = new ArrayList<>(req.timestampsList.size());
- for(ByteBuffer byteBuffer : req.timestampsList){
- timeBuffer.add(ReadWriteIOUtils.clone(byteBuffer));
- byteBuffer.flip();
- }
- transferReq.valuesList = valueBuffer;
- transferReq.timestampsList = timeBuffer;
-
- AsyncInsertPool.getInstance().submit(transferReq);
+ AsyncInsertPool.getInstance().submit(req);
}
//