You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/01/06 09:10:02 UTC

[iotdb] 01/01: init

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch selector_queue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e6bf05b603db2fffeb9d87eb7258ecb1a465d2f
Author: LebronAl <TX...@gmail.com>
AuthorDate: Wed Jan 6 17:09:30 2021 +0800

    init
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  6 ++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  9 +++--
 .../java/org/apache/iotdb/db/service/Test.java     | 42 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 388ff50..fb6db13 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -104,7 +104,7 @@ public class StorageEngine implements IService {
   /**
    * storage group name -> storage group processor
    */
-  private final ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap = new ConcurrentHashMap<>();
+  private final HashMap<PartialPath, StorageGroupProcessor> processorMap = new HashMap<>();
 
   private static final ExecutorService recoveryThreadPool = IoTDBThreadPoolFactory
       .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b338e00..8cada8b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -711,7 +711,7 @@ public class StorageGroupProcessor {
       }
     }
 
-    writeLock();
+//    writeLock();
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
@@ -798,8 +798,10 @@ public class StorageGroupProcessor {
       if (!noFailure) {
         throw new BatchProcessException(results);
       }
+//    } finally {
+//      writeUnlock();
     } finally {
-      writeUnlock();
+
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 2144f5c..4bd318e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -77,9 +79,9 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -178,6 +180,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private AtomicLong sessionIdGenerator = new AtomicLong();
   // The statementId is unique in one IoTDB instance.
   private AtomicLong statementIdGenerator = new AtomicLong();
+  private ExecutorService e = Executors.newSingleThreadExecutor();
 
   // (sessionId -> Set(statementId))
   private Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
@@ -1514,7 +1517,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return status;
       }
 
-      return executeNonQueryPlan(insertTabletPlan);
+      Future<TSStatus> result = e.submit(new Test(insertTabletPlan, this));
+
+      return result.get();
     } catch (NullPointerException e) {
       logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils
diff --git a/server/src/main/java/org/apache/iotdb/db/service/Test.java b/server/src/main/java/org/apache/iotdb/db/service/Test.java
new file mode 100644
index 0000000..54197ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/Test.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class Test implements Callable<TSStatus> {
+
+  private PhysicalPlan plan;
+  private TSServiceImpl impl;
+
+  public Test(PhysicalPlan plan, TSServiceImpl impl) {
+    this.plan = plan;
+    this.impl = impl;
+
+  }
+
+  @Override
+  public TSStatus call() throws Exception {
+    return impl.executeNonQueryPlan(plan);
+  }
+
+}