You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/01/06 09:10:01 UTC

[iotdb] branch selector_queue created (now 2e6bf05)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch selector_queue
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 2e6bf05  init

This branch includes the following new commits:

     new 2e6bf05  init

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: init

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch selector_queue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e6bf05b603db2fffeb9d87eb7258ecb1a465d2f
Author: LebronAl <TX...@gmail.com>
AuthorDate: Wed Jan 6 17:09:30 2021 +0800

    init
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  6 ++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  9 +++--
 .../java/org/apache/iotdb/db/service/Test.java     | 42 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 388ff50..fb6db13 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -104,7 +104,7 @@ public class StorageEngine implements IService {
   /**
    * storage group name -> storage group processor
    */
-  private final ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap = new ConcurrentHashMap<>();
+  private final HashMap<PartialPath, StorageGroupProcessor> processorMap = new HashMap<>();
 
   private static final ExecutorService recoveryThreadPool = IoTDBThreadPoolFactory
       .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b338e00..8cada8b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -711,7 +711,7 @@ public class StorageGroupProcessor {
       }
     }
 
-    writeLock();
+//    writeLock();
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
@@ -798,8 +798,10 @@ public class StorageGroupProcessor {
       if (!noFailure) {
         throw new BatchProcessException(results);
       }
+//    } finally {
+//      writeUnlock();
     } finally {
-      writeUnlock();
+
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 2144f5c..4bd318e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -77,9 +79,9 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -178,6 +180,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private AtomicLong sessionIdGenerator = new AtomicLong();
   // The statementId is unique in one IoTDB instance.
   private AtomicLong statementIdGenerator = new AtomicLong();
+  private ExecutorService e = Executors.newSingleThreadExecutor();
 
   // (sessionId -> Set(statementId))
   private Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
@@ -1514,7 +1517,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return status;
       }
 
-      return executeNonQueryPlan(insertTabletPlan);
+      Future<TSStatus> result = e.submit(new Test(insertTabletPlan, this));
+
+      return result.get();
     } catch (NullPointerException e) {
       logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils
diff --git a/server/src/main/java/org/apache/iotdb/db/service/Test.java b/server/src/main/java/org/apache/iotdb/db/service/Test.java
new file mode 100644
index 0000000..54197ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/Test.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class Test implements Callable<TSStatus> {
+
+  private PhysicalPlan plan;
+  private TSServiceImpl impl;
+
+  public Test(PhysicalPlan plan, TSServiceImpl impl) {
+    this.plan = plan;
+    this.impl = impl;
+
+  }
+
+  @Override
+  public TSStatus call() throws Exception {
+    return impl.executeNonQueryPlan(plan);
+  }
+
+}