You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/12 10:05:46 UTC

[incubator-iotdb] 01/01: remove backip

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 693b1450ac6f0e7fd72268eac9fdef757fd3d60d
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Sun Jul 12 18:00:03 2020 +0800

    remove backip
---
 .../cluster/server/member/MetaGroupMember.java     | 25 +++++++-----
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 34 +++++++++++++---
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 45 ++++++++++++++++++++--
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 16 ++++++++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 19 +++++++++
 5 files changed, 119 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 99600b9..a36ca6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1454,9 +1454,9 @@ public class MetaGroupMember extends RaftMember {
               setStorageGroupResult.getCode(), storageGroupName)
       );
     }
-    if(plan instanceof InsertRowPlan){
+    if(plan instanceof InsertPlan){
       // try to create timeseries
-      boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
+      boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertPlan) plan);
       if (!isAutoCreateTimeseriesSuccess) {
         throw new MetadataException(
             "Failed to create timeseries from InsertPlan automatically."
@@ -1473,10 +1473,10 @@ public class MetaGroupMember extends RaftMember {
    * @return
    */
   TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
-    InsertRowPlan backup = null;
-    if (plan instanceof InsertRowPlan) {
-      backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
-    }
+//    InsertRowPlan backup = null;
+//    if (plan instanceof InsertRowPlan) {
+//      backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
+//    }
     // the error codes from the groups that cannot execute the plan
     TSStatus status;
     if (planGroupMap.size() == 1) {
@@ -1492,9 +1492,12 @@ public class MetaGroupMember extends RaftMember {
         && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
         && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
       // try to create timeseries
-      boolean hasCreate = autoCreateTimeseries(backup);
+      if(((InsertPlan)plan).getFailedMeasurements() != null){
+        ((InsertPlan)plan).transform();
+      }
+      boolean hasCreate = autoCreateTimeseries((InsertPlan) plan);
       if (hasCreate) {
-        status = forwardPlan(planGroupMap, backup);
+        status = forwardPlan(planGroupMap, plan);
       } else {
         logger.error("{}, Cannot auto create timeseries.", thisNode);
       }
@@ -1629,7 +1632,7 @@ public class MetaGroupMember extends RaftMember {
    * @param insertPlan, some of the timeseries in it are not created yet
    * @return true of all uncreated timeseries are created
    */
-  boolean autoCreateTimeseries(InsertRowPlan insertPlan) {
+  boolean autoCreateTimeseries(InsertPlan insertPlan) {
     List<String> seriesList = new ArrayList<>();
     String deviceId = insertPlan.getDeviceId();
     String storageGroupName;
@@ -1651,7 +1654,9 @@ public class MetaGroupMember extends RaftMember {
     for (String seriesPath : unregisteredSeriesList) {
       int index = seriesList.indexOf(seriesPath);
       TSDataType dataType = TypeInferenceUtils
-          .getPredictedDataType(insertPlan.getValues()[index], true);
+          .getPredictedDataType(insertPlan instanceof InsertTabletPlan
+              ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0]
+              : ((InsertRowPlan) insertPlan).getValues()[index], true);
       TSEncoding encoding = getDefaultEncoding(dataType);
       CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
       CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index c3be451..1395ece 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -262,7 +263,8 @@ public class PlanExecutor implements IPlanExecutor {
             (storageGroupName, partitionId) ->
                 storageGroupName.equals(((DeletePartitionPlan) plan).getStorageGroupName())
                     && p.getPartitionId().contains(partitionId);
-        StorageEngine.getInstance().removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+        StorageEngine.getInstance()
+            .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
         return true;
       case CREATE_SCHEMA_SNAPSHOT:
         operateCreateSnapshot();
@@ -849,8 +851,10 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
-    throws MetadataException {
-    return mManager.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+      throws MetadataException {
+    return mManager
+        .getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(),
+            insertPlan);
   }
 
   @Override
@@ -860,15 +864,33 @@ public class PlanExecutor implements IPlanExecutor {
       insertRowPlan.setSchemasAndTransferType(schemas);
       StorageEngine.getInstance().insert(insertRowPlan);
       if (insertRowPlan.getFailedMeasurements() != null) {
-        throw new StorageEngineException(
-            "failed to insert measurements " + insertRowPlan.getFailedMeasurements());
+        // check if all path not exist exceptions
+        List<String> failedPaths = insertRowPlan.getFailedMeasurements();
+        List<Exception> exceptions = insertRowPlan.getFailedExceptions();
+        boolean isPathNotExistException = true;
+        for (Exception e : exceptions) {
+          Throwable curException = e;
+          while (curException.getCause() != null) {
+            curException = curException.getCause();
+          }
+          if (!(curException instanceof PathNotExistException)) {
+            isPathNotExistException = false;
+            break;
+          }
+        }
+        if (isPathNotExistException) {
+          throw new PathNotExistException(failedPaths);
+        } else {
+          throw new StorageEngineException(
+              "failed to insert points " + insertRowPlan.getFailedMeasurements());
+        }
       }
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
       // TODO: put lock and unlock in the same block
       mManager.unlockDeviceReadLock(insertRowPlan.getDeviceId());
-  }
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3b4222c..ba89656 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -19,8 +19,12 @@
 
 package org.apache.iotdb.db.qp.physical.crud;
 
+import io.jsonwebtoken.lang.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -35,7 +39,9 @@ public abstract class InsertPlan extends PhysicalPlan {
 
 
   // record the failed measurements
-  Map<String, Exception> failedMeasurements;
+  List<String> failedMeasurements;
+  List<Exception> failedExceptions;
+  List<Integer> failedIndices;
 
   public InsertPlan(Operator.OperatorType operatorType) {
     super(false, operatorType);
@@ -74,10 +80,14 @@ public abstract class InsertPlan extends PhysicalPlan {
     this.schemas = schemas;
   }
 
-  public Map<String, Exception> getFailedMeasurements() {
+  public List<String> getFailedMeasurements() {
     return failedMeasurements;
   }
 
+  public List<Exception> getFailedExceptions() {
+    return failedExceptions;
+  }
+
   public int getFailedMeasurementNumber() {
     return failedMeasurements == null ? 0 : failedMeasurements.size();
   }
@@ -88,11 +98,38 @@ public abstract class InsertPlan extends PhysicalPlan {
    */
   public void markFailedMeasurementInsertion(int index, Exception e) {
     if (failedMeasurements == null) {
-      failedMeasurements = new HashMap<>();
+      failedMeasurements = new ArrayList<>();
+      failedExceptions = new ArrayList<>();
+      failedIndices = new ArrayList<>();
     }
-    failedMeasurements.put(measurements[index], e);
+    failedMeasurements.add(measurements[index]);
+    failedExceptions.add(e);
+    failedIndices.add(index);
     measurements[index] = null;
     dataTypes[index] = null;
   }
 
+  public InsertPlan transform() {
+    if (failedMeasurements == null) {
+      return null;
+    }
+    measurements = failedMeasurements.toArray(new String[0]);
+    failedMeasurements = null;
+    if(dataTypes != null){
+      TSDataType[] temp = dataTypes.clone();
+      dataTypes = new TSDataType[failedIndices.size()];
+      for(int i = 0; i < failedIndices.size(); i++){
+        dataTypes[i] = temp[failedIndices.get(i)];
+      }
+    }
+    if(schemas != null){
+      MeasurementSchema[] temp = schemas.clone();
+      schemas = new MeasurementSchema[failedIndices.size()];
+      for(int i = 0; i < failedIndices.size(); i++){
+        schemas[i] = temp[failedIndices.get(i)];
+      }
+    }
+    return this;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 99b179e..82acd24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -57,6 +57,8 @@ public class InsertRowPlan extends InsertPlan {
   // if values is object[], we could use the raw type of them, and we should set this to false
   private boolean isNeedInferType = false;
 
+  private List<Object> failedValues;
+
   public InsertRowPlan() {
     super(OperatorType.INSERT);
   }
@@ -190,6 +192,10 @@ public class InsertRowPlan extends InsertPlan {
   @Override
   public void markFailedMeasurementInsertion(int index, Exception e) {
     super.markFailedMeasurementInsertion(index, e);
+    if (failedValues == null) {
+      failedValues = new ArrayList<>();
+    }
+    failedValues.add(values[index]);
     values[index] = null;
   }
 
@@ -452,4 +458,14 @@ public class InsertRowPlan extends InsertPlan {
     System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length);
     return new InsertRowPlan(deviceIdClone, timeClone, measurementsClone, typesClone, valuesClone);
   }
+
+  @Override
+  public InsertPlan transform() {
+    if (super.transform() == null) {
+      return null;
+    }
+    values = failedValues.toArray(new Object[0]);
+    failedValues = null;
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 3780365..da4347b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -62,6 +62,8 @@ public class InsertTabletPlan extends InsertPlan {
   private int end;
   private List<Integer> range;
 
+  private List<Object> failedColumns;
+
 
   public InsertTabletPlan() {
     super(OperatorType.BATCHINSERT);
@@ -501,9 +503,25 @@ public class InsertTabletPlan extends InsertPlan {
 
   public void markFailedMeasurementInsertion(int index, Exception e) {
     super.markFailedMeasurementInsertion(index, e);
+    if (failedColumns == null) {
+      failedColumns = new ArrayList<>();
+    }
+    failedColumns.add(columns[index]);
     columns[index] = null;
   }
 
+
+  @Override
+  public InsertPlan transform() {
+    if (super.transform() == null) {
+      return null;
+    }
+    // TODO anything else?
+    columns = failedColumns.toArray(new Object[0]);
+    failedColumns = null;
+    return this;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -532,4 +550,5 @@ public class InsertTabletPlan extends InsertPlan {
     result = 31 * result + Arrays.hashCode(times);
     return result;
   }
+
 }