You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/01/06 09:10:02 UTC
[iotdb] 01/01: init
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch selector_queue
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e6bf05b603db2fffeb9d87eb7258ecb1a465d2f
Author: LebronAl <TX...@gmail.com>
AuthorDate: Wed Jan 6 17:09:30 2021 +0800
init
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 6 ++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 9 +++--
.../java/org/apache/iotdb/db/service/Test.java | 42 ++++++++++++++++++++++
4 files changed, 54 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 388ff50..fb6db13 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -104,7 +104,7 @@ public class StorageEngine implements IService {
/**
* storage group name -> storage group processor
*/
- private final ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap = new ConcurrentHashMap<>();
+ private final HashMap<PartialPath, StorageGroupProcessor> processorMap = new HashMap<>();
private static final ExecutorService recoveryThreadPool = IoTDBThreadPoolFactory
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b338e00..8cada8b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -711,7 +711,7 @@ public class StorageGroupProcessor {
}
}
- writeLock();
+// writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
@@ -798,8 +798,10 @@ public class StorageGroupProcessor {
if (!noFailure) {
throw new BatchProcessException(results);
}
+// } finally {
+// writeUnlock();
} finally {
- writeUnlock();
+
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 2144f5c..4bd318e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -32,7 +32,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,9 +79,9 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -178,6 +180,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private AtomicLong sessionIdGenerator = new AtomicLong();
// The statementId is unique in one IoTDB instance.
private AtomicLong statementIdGenerator = new AtomicLong();
+ private ExecutorService e = Executors.newSingleThreadExecutor();
// (sessionId -> Set(statementId))
private Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
@@ -1514,7 +1517,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return status;
}
- return executeNonQueryPlan(insertTabletPlan);
+ Future<TSStatus> result = e.submit(new Test(insertTabletPlan, this));
+
+ return result.get();
} catch (NullPointerException e) {
logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils
diff --git a/server/src/main/java/org/apache/iotdb/db/service/Test.java b/server/src/main/java/org/apache/iotdb/db/service/Test.java
new file mode 100644
index 0000000..54197ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/Test.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class Test implements Callable<TSStatus> {
+
+ private PhysicalPlan plan;
+ private TSServiceImpl impl;
+
+ public Test(PhysicalPlan plan, TSServiceImpl impl) {
+ this.plan = plan;
+ this.impl = impl;
+
+ }
+
+ @Override
+ public TSStatus call() throws Exception {
+ return impl.executeNonQueryPlan(plan);
+ }
+
+}