You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/05/29 08:44:22 UTC

[incubator-iotdb] branch add_batch_insert updated: group by storageGroup in batch insert execute

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch add_batch_insert
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/add_batch_insert by this push:
     new 8524023  group by storageGroup in batch insert execute
8524023 is described below

commit 8524023452e5d551c47fede937fbe72a62bb70c5
Author: suyue <23...@qq.com>
AuthorDate: Wed May 29 16:44:05 2019 +0800

    group by storageGroup in batch insert execute
---
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 67 +++++++++++++++++++---
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  8 +--
 2 files changed, 63 insertions(+), 12 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 1b5d65c..35cbaaa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -214,6 +215,12 @@ public class FileNodeManager implements IStatistic, IService {
       LOGGER.error("MManager get filenode name error, seriesPath is {}", path);
       throw new FileNodeManagerException(e);
     }
+    FileNodeProcessor processor = getProcessorByName(filenodeName, isWriteLock);
+    return processor;
+  }
+
+  private FileNodeProcessor getProcessorByName(String filenodeName, boolean isWriteLock)
+      throws FileNodeManagerException {
     FileNodeProcessor processor;
     processor = processorMap.get(filenodeName);
     if (processor != null) {
@@ -330,8 +337,7 @@ public class FileNodeManager implements IStatistic, IService {
    */
   public Pair<List<Integer>, String> insertBatch(TSRecord[] tsRecords, List<Integer> partialResult,
       String message, boolean isMonitor) {
-    Map<String, List<Integer>> fileNode2RecordIndexes = new HashMap<>();
-    Map<String, List<TSRecord>> fileNode2Records = new HashMap<>();
+    Map<String, Map<TSRecord, Integer>> fileNode2RecordIndexes = new HashMap<>();
     for (int i = 0; i < tsRecords.length; i++) {
       if (partialResult.get(i) == Statement.EXECUTE_FAILED) {
         continue;
@@ -341,16 +347,59 @@ public class FileNodeManager implements IStatistic, IService {
         checkTimestamp(record);
         updateStat(isMonitor, record);
         String fileNode = MManager.getInstance().getFileNameByPath(record.deviceId);
-        fileNode2RecordIndexes.computeIfAbsent(fileNode, f -> new ArrayList<>()).add(i);
-        fileNode2Records.computeIfAbsent(fileNode, f -> new ArrayList<>()).add(record);
+        fileNode2RecordIndexes.computeIfAbsent(fileNode, f -> new HashMap<>()).put(record, i);
       } catch (Exception e) {
         e.printStackTrace();
         partialResult.set(i, Statement.EXECUTE_FAILED);
         message = e.getMessage();
       }
     }
+    for (Entry<String, Map<TSRecord, Integer>> entry : fileNode2RecordIndexes.entrySet()) {
+      FileNodeProcessor fileNodeProcessor = null;
 
-    return null;
+      try {
+        fileNodeProcessor = getProcessorByName(entry.getKey(), true);
+      } catch (FileNodeManagerException e) {
+        // get file node processor, all insert sql in this batch will fail.
+        LOGGER.error(String.format("Encounter an error when getting the file node processor %s.",
+            entry.getKey()), e);
+        entry.getValue().forEach((tsRecord, index) -> {
+          partialResult.set(index, Statement.EXECUTE_FAILED);
+        });
+        if (fileNodeProcessor != null) {
+          fileNodeProcessor.writeUnlock();
+        }
+        message = e.getMessage();
+        continue;
+      }
+      int index = 0;
+      try {
+        for (Entry<TSRecord, Integer> recordIndexEntry : entry.getValue().entrySet()) {
+          TSRecord tsRecord = recordIndexEntry.getKey();
+          long timestamp = tsRecord.time;
+          String deviceId = tsRecord.deviceId;
+          index = recordIndexEntry.getValue();
+          long lastUpdateTime = fileNodeProcessor.getFlushLastUpdateTime(deviceId);
+          if (timestamp < lastUpdateTime) {
+            insertOverflow(fileNodeProcessor, timestamp, tsRecord, isMonitor, deviceId);
+          } else {
+            insertBufferWrite(fileNodeProcessor, timestamp, isMonitor, tsRecord, deviceId);
+          }
+          partialResult.set(index, Statement.SUCCESS_NO_INFO);
+        }
+      } catch (FileNodeProcessorException | FileNodeManagerException e) {
+        LOGGER.error(
+            String.format("Encounter an error when insert a tsRecord, the processor name is %s.",
+                fileNodeProcessor.getProcessorName()), e);
+        partialResult.set(index, Statement.EXECUTE_FAILED);
+        message = e.getMessage();
+      } finally {
+        if (fileNodeProcessor != null) {
+          fileNodeProcessor.writeUnlock();
+        }
+      }
+    }
+    return new Pair<>(partialResult, message);
   }
 
 
@@ -360,7 +409,7 @@ public class FileNodeManager implements IStatistic, IService {
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         String[] measurementList = new String[tsRecord.dataPointList.size()];
         String[] insertValues = new String[tsRecord.dataPointList.size()];
-        int i=0;
+        int i = 0;
         for (DataPoint dp : tsRecord.dataPointList) {
           measurementList[i] = dp.getMeasurementId();
           insertValues[i] = dp.getValue().toString();
@@ -445,7 +494,8 @@ public class FileNodeManager implements IStatistic, IService {
       String bufferwriteBaseDir = bufferWriteProcessor.getBaseDir();
       String bufferwriteRelativePath = bufferWriteProcessor.getFileRelativePath();
       try {
-        fileNodeProcessor.addIntervalFileNode(new File(new File(bufferwriteBaseDir), bufferwriteRelativePath));
+        fileNodeProcessor
+            .addIntervalFileNode(new File(new File(bufferwriteBaseDir), bufferwriteRelativePath));
       } catch (Exception e) {
         if (!isMonitor) {
           updateStatHashMapWhenFail(tsRecord);
@@ -763,7 +813,8 @@ public class FileNodeManager implements IStatistic, IService {
       // append file to storage group.
       fileNodeProcessor.appendFile(appendFile, appendFilePath);
     } catch (FileNodeProcessorException e) {
-      LOGGER.error("Cannot append the file {} to {}", appendFile.getFile().getAbsolutePath(), fileNodeName, e);
+      LOGGER.error("Cannot append the file {} to {}", appendFile.getFile().getAbsolutePath(),
+          fileNodeName, e);
       throw new FileNodeManagerException(e);
     } finally {
       fileNodeProcessor.writeUnlock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index c89e41a..0c4488a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -450,7 +450,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       boolean allInsert = true;
 
       for (int i = 0; i < physicalPlans.length; i++) {
-        PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
+        PhysicalPlan physicalPlan = processor
+            .parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
         physicalPlan.setProposer(username.get());
         physicalPlans[i] = physicalPlan;
         if (!(physicalPlan instanceof InsertPlan)) {
@@ -460,12 +461,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       if (allInsert) {
         // execute batch insert
-        Pair<List<Integer>, String> pair = executeBatchInsert((InsertPlan[])physicalPlans);
+        Pair<List<Integer>, String> pair = executeBatchInsert((InsertPlan[]) physicalPlans);
         result = pair.left;
         // only used when having failure
         batchErrorMessage = pair.right;
         if (batchErrorMessage != null) {
-            isAllSuccessful = false;
+          isAllSuccessful = false;
         }
       } else {
         // execute one by one
@@ -509,7 +510,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   /**
-   * @param insertPlans
    * @return a list of return code and message
    */
   private Pair<List<Integer>, String> executeBatchInsert(InsertPlan[] insertPlans) {