You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/12 10:05:45 UTC

[incubator-iotdb] branch kyy2 created (now 693b145)

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a change to branch kyy2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 693b145  remove backip

This branch includes the following new commits:

     new 693b145  remove backip

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: remove backip

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 693b1450ac6f0e7fd72268eac9fdef757fd3d60d
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Sun Jul 12 18:00:03 2020 +0800

    remove backip
---
 .../cluster/server/member/MetaGroupMember.java     | 25 +++++++-----
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 34 +++++++++++++---
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 45 ++++++++++++++++++++--
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 16 ++++++++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 19 +++++++++
 5 files changed, 119 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 99600b9..a36ca6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1454,9 +1454,9 @@ public class MetaGroupMember extends RaftMember {
               setStorageGroupResult.getCode(), storageGroupName)
       );
     }
-    if(plan instanceof InsertRowPlan){
+    if(plan instanceof InsertPlan){
       // try to create timeseries
-      boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
+      boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertPlan) plan);
       if (!isAutoCreateTimeseriesSuccess) {
         throw new MetadataException(
             "Failed to create timeseries from InsertPlan automatically."
@@ -1473,10 +1473,10 @@ public class MetaGroupMember extends RaftMember {
    * @return
    */
   TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
-    InsertRowPlan backup = null;
-    if (plan instanceof InsertRowPlan) {
-      backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
-    }
+//    InsertRowPlan backup = null;
+//    if (plan instanceof InsertRowPlan) {
+//      backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
+//    }
     // the error codes from the groups that cannot execute the plan
     TSStatus status;
     if (planGroupMap.size() == 1) {
@@ -1492,9 +1492,12 @@ public class MetaGroupMember extends RaftMember {
         && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
         && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
       // try to create timeseries
-      boolean hasCreate = autoCreateTimeseries(backup);
+      if(((InsertPlan)plan).getFailedMeasurements() != null){
+        ((InsertPlan)plan).transform();
+      }
+      boolean hasCreate = autoCreateTimeseries((InsertPlan) plan);
       if (hasCreate) {
-        status = forwardPlan(planGroupMap, backup);
+        status = forwardPlan(planGroupMap, plan);
       } else {
         logger.error("{}, Cannot auto create timeseries.", thisNode);
       }
@@ -1629,7 +1632,7 @@ public class MetaGroupMember extends RaftMember {
    * @param insertPlan, some of the timeseries in it are not created yet
    * @return true of all uncreated timeseries are created
    */
-  boolean autoCreateTimeseries(InsertRowPlan insertPlan) {
+  boolean autoCreateTimeseries(InsertPlan insertPlan) {
     List<String> seriesList = new ArrayList<>();
     String deviceId = insertPlan.getDeviceId();
     String storageGroupName;
@@ -1651,7 +1654,9 @@ public class MetaGroupMember extends RaftMember {
     for (String seriesPath : unregisteredSeriesList) {
       int index = seriesList.indexOf(seriesPath);
       TSDataType dataType = TypeInferenceUtils
-          .getPredictedDataType(insertPlan.getValues()[index], true);
+          .getPredictedDataType(insertPlan instanceof InsertTabletPlan
+              ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0]
+              : ((InsertRowPlan) insertPlan).getValues()[index], true);
       TSEncoding encoding = getDefaultEncoding(dataType);
       CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
       CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index c3be451..1395ece 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -262,7 +263,8 @@ public class PlanExecutor implements IPlanExecutor {
             (storageGroupName, partitionId) ->
                 storageGroupName.equals(((DeletePartitionPlan) plan).getStorageGroupName())
                     && p.getPartitionId().contains(partitionId);
-        StorageEngine.getInstance().removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+        StorageEngine.getInstance()
+            .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
         return true;
       case CREATE_SCHEMA_SNAPSHOT:
         operateCreateSnapshot();
@@ -849,8 +851,10 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
-    throws MetadataException {
-    return mManager.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+      throws MetadataException {
+    return mManager
+        .getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(),
+            insertPlan);
   }
 
   @Override
@@ -860,15 +864,33 @@ public class PlanExecutor implements IPlanExecutor {
       insertRowPlan.setSchemasAndTransferType(schemas);
       StorageEngine.getInstance().insert(insertRowPlan);
       if (insertRowPlan.getFailedMeasurements() != null) {
-        throw new StorageEngineException(
-            "failed to insert measurements " + insertRowPlan.getFailedMeasurements());
+        // check if all path not exist exceptions
+        List<String> failedPaths = insertRowPlan.getFailedMeasurements();
+        List<Exception> exceptions = insertRowPlan.getFailedExceptions();
+        boolean isPathNotExistException = true;
+        for (Exception e : exceptions) {
+          Throwable curException = e;
+          while (curException.getCause() != null) {
+            curException = curException.getCause();
+          }
+          if (!(curException instanceof PathNotExistException)) {
+            isPathNotExistException = false;
+            break;
+          }
+        }
+        if (isPathNotExistException) {
+          throw new PathNotExistException(failedPaths);
+        } else {
+          throw new StorageEngineException(
+              "failed to insert points " + insertRowPlan.getFailedMeasurements());
+        }
       }
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
       // TODO: put lock and unlock in the same block
       mManager.unlockDeviceReadLock(insertRowPlan.getDeviceId());
-  }
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3b4222c..ba89656 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -19,8 +19,12 @@
 
 package org.apache.iotdb.db.qp.physical.crud;
 
+import io.jsonwebtoken.lang.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -35,7 +39,9 @@ public abstract class InsertPlan extends PhysicalPlan {
 
 
   // record the failed measurements
-  Map<String, Exception> failedMeasurements;
+  List<String> failedMeasurements;
+  List<Exception> failedExceptions;
+  List<Integer> failedIndices;
 
   public InsertPlan(Operator.OperatorType operatorType) {
     super(false, operatorType);
@@ -74,10 +80,14 @@ public abstract class InsertPlan extends PhysicalPlan {
     this.schemas = schemas;
   }
 
-  public Map<String, Exception> getFailedMeasurements() {
+  public List<String> getFailedMeasurements() {
     return failedMeasurements;
   }
 
+  public List<Exception> getFailedExceptions() {
+    return failedExceptions;
+  }
+
   public int getFailedMeasurementNumber() {
     return failedMeasurements == null ? 0 : failedMeasurements.size();
   }
@@ -88,11 +98,38 @@ public abstract class InsertPlan extends PhysicalPlan {
    */
   public void markFailedMeasurementInsertion(int index, Exception e) {
     if (failedMeasurements == null) {
-      failedMeasurements = new HashMap<>();
+      failedMeasurements = new ArrayList<>();
+      failedExceptions = new ArrayList<>();
+      failedIndices = new ArrayList<>();
     }
-    failedMeasurements.put(measurements[index], e);
+    failedMeasurements.add(measurements[index]);
+    failedExceptions.add(e);
+    failedIndices.add(index);
     measurements[index] = null;
     dataTypes[index] = null;
   }
 
+  public InsertPlan transform() {
+    if (failedMeasurements == null) {
+      return null;
+    }
+    measurements = failedMeasurements.toArray(new String[0]);
+    failedMeasurements = null;
+    if(dataTypes != null){
+      TSDataType[] temp = dataTypes.clone();
+      dataTypes = new TSDataType[failedIndices.size()];
+      for(int i = 0; i < failedIndices.size(); i++){
+        dataTypes[i] = temp[failedIndices.get(i)];
+      }
+    }
+    if(schemas != null){
+      MeasurementSchema[] temp = schemas.clone();
+      schemas = new MeasurementSchema[failedIndices.size()];
+      for(int i = 0; i < failedIndices.size(); i++){
+        schemas[i] = temp[failedIndices.get(i)];
+      }
+    }
+    return this;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 99b179e..82acd24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -57,6 +57,8 @@ public class InsertRowPlan extends InsertPlan {
   // if values is object[], we could use the raw type of them, and we should set this to false
   private boolean isNeedInferType = false;
 
+  private List<Object> failedValues;
+
   public InsertRowPlan() {
     super(OperatorType.INSERT);
   }
@@ -190,6 +192,10 @@ public class InsertRowPlan extends InsertPlan {
   @Override
   public void markFailedMeasurementInsertion(int index, Exception e) {
     super.markFailedMeasurementInsertion(index, e);
+    if (failedValues == null) {
+      failedValues = new ArrayList<>();
+    }
+    failedValues.add(values[index]);
     values[index] = null;
   }
 
@@ -452,4 +458,14 @@ public class InsertRowPlan extends InsertPlan {
     System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length);
     return new InsertRowPlan(deviceIdClone, timeClone, measurementsClone, typesClone, valuesClone);
   }
+
+  @Override
+  public InsertPlan transform() {
+    if (super.transform() == null) {
+      return null;
+    }
+    values = failedValues.toArray(new Object[0]);
+    failedValues = null;
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 3780365..da4347b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -62,6 +62,8 @@ public class InsertTabletPlan extends InsertPlan {
   private int end;
   private List<Integer> range;
 
+  private List<Object> failedColumns;
+
 
   public InsertTabletPlan() {
     super(OperatorType.BATCHINSERT);
@@ -501,9 +503,25 @@ public class InsertTabletPlan extends InsertPlan {
 
   public void markFailedMeasurementInsertion(int index, Exception e) {
     super.markFailedMeasurementInsertion(index, e);
+    if (failedColumns == null) {
+      failedColumns = new ArrayList<>();
+    }
+    failedColumns.add(columns[index]);
     columns[index] = null;
   }
 
+
+  @Override
+  public InsertPlan transform() {
+    if (super.transform() == null) {
+      return null;
+    }
+    // TODO anything else?
+    columns = failedColumns.toArray(new Object[0]);
+    failedColumns = null;
+    return this;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -532,4 +550,5 @@ public class InsertTabletPlan extends InsertPlan {
     result = 31 * result + Arrays.hashCode(times);
     return result;
   }
+
 }