You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/12 10:05:46 UTC
[incubator-iotdb] 01/01: remove backip
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 693b1450ac6f0e7fd72268eac9fdef757fd3d60d
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Sun Jul 12 18:00:03 2020 +0800
remove backip
---
.../cluster/server/member/MetaGroupMember.java | 25 +++++++-----
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 34 +++++++++++++---
.../iotdb/db/qp/physical/crud/InsertPlan.java | 45 ++++++++++++++++++++--
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 16 ++++++++
.../db/qp/physical/crud/InsertTabletPlan.java | 19 +++++++++
5 files changed, 119 insertions(+), 20 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 99600b9..a36ca6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1454,9 +1454,9 @@ public class MetaGroupMember extends RaftMember {
setStorageGroupResult.getCode(), storageGroupName)
);
}
- if(plan instanceof InsertRowPlan){
+ if(plan instanceof InsertPlan){
// try to create timeseries
- boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
+ boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertPlan) plan);
if (!isAutoCreateTimeseriesSuccess) {
throw new MetadataException(
"Failed to create timeseries from InsertPlan automatically."
@@ -1473,10 +1473,10 @@ public class MetaGroupMember extends RaftMember {
* @return
*/
TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
- InsertRowPlan backup = null;
- if (plan instanceof InsertRowPlan) {
- backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
- }
+// InsertRowPlan backup = null;
+// if (plan instanceof InsertRowPlan) {
+// backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
+// }
// the error codes from the groups that cannot execute the plan
TSStatus status;
if (planGroupMap.size() == 1) {
@@ -1492,9 +1492,12 @@ public class MetaGroupMember extends RaftMember {
&& status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
// try to create timeseries
- boolean hasCreate = autoCreateTimeseries(backup);
+ if(((InsertPlan)plan).getFailedMeasurements() != null){
+ ((InsertPlan)plan).transform();
+ }
+ boolean hasCreate = autoCreateTimeseries((InsertPlan) plan);
if (hasCreate) {
- status = forwardPlan(planGroupMap, backup);
+ status = forwardPlan(planGroupMap, plan);
} else {
logger.error("{}, Cannot auto create timeseries.", thisNode);
}
@@ -1629,7 +1632,7 @@ public class MetaGroupMember extends RaftMember {
* @param insertPlan, some of the timeseries in it are not created yet
* @return true of all uncreated timeseries are created
*/
- boolean autoCreateTimeseries(InsertRowPlan insertPlan) {
+ boolean autoCreateTimeseries(InsertPlan insertPlan) {
List<String> seriesList = new ArrayList<>();
String deviceId = insertPlan.getDeviceId();
String storageGroupName;
@@ -1651,7 +1654,9 @@ public class MetaGroupMember extends RaftMember {
for (String seriesPath : unregisteredSeriesList) {
int index = seriesList.indexOf(seriesPath);
TSDataType dataType = TypeInferenceUtils
- .getPredictedDataType(insertPlan.getValues()[index], true);
+ .getPredictedDataType(insertPlan instanceof InsertTabletPlan
+ ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0]
+ : ((InsertRowPlan) insertPlan).getValues()[index], true);
TSEncoding encoding = getDefaultEncoding(dataType);
CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index c3be451..1395ece 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
@@ -262,7 +263,8 @@ public class PlanExecutor implements IPlanExecutor {
(storageGroupName, partitionId) ->
storageGroupName.equals(((DeletePartitionPlan) plan).getStorageGroupName())
&& p.getPartitionId().contains(partitionId);
- StorageEngine.getInstance().removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
+ StorageEngine.getInstance()
+ .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
return true;
case CREATE_SCHEMA_SNAPSHOT:
operateCreateSnapshot();
@@ -849,8 +851,10 @@ public class PlanExecutor implements IPlanExecutor {
}
protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
- throws MetadataException {
- return mManager.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+ throws MetadataException {
+ return mManager
+ .getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(),
+ insertPlan);
}
@Override
@@ -860,15 +864,33 @@ public class PlanExecutor implements IPlanExecutor {
insertRowPlan.setSchemasAndTransferType(schemas);
StorageEngine.getInstance().insert(insertRowPlan);
if (insertRowPlan.getFailedMeasurements() != null) {
- throw new StorageEngineException(
- "failed to insert measurements " + insertRowPlan.getFailedMeasurements());
+ // check if all path not exist exceptions
+ List<String> failedPaths = insertRowPlan.getFailedMeasurements();
+ List<Exception> exceptions = insertRowPlan.getFailedExceptions();
+ boolean isPathNotExistException = true;
+ for (Exception e : exceptions) {
+ Throwable curException = e;
+ while (curException.getCause() != null) {
+ curException = curException.getCause();
+ }
+ if (!(curException instanceof PathNotExistException)) {
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if (isPathNotExistException) {
+ throw new PathNotExistException(failedPaths);
+ } else {
+ throw new StorageEngineException(
+ "failed to insert points " + insertRowPlan.getFailedMeasurements());
+ }
}
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
// TODO: put lock and unlock in the same block
mManager.unlockDeviceReadLock(insertRowPlan.getDeviceId());
- }
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3b4222c..ba89656 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -19,8 +19,12 @@
package org.apache.iotdb.db.qp.physical.crud;
+import io.jsonwebtoken.lang.Collections;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -35,7 +39,9 @@ public abstract class InsertPlan extends PhysicalPlan {
// record the failed measurements
- Map<String, Exception> failedMeasurements;
+ List<String> failedMeasurements;
+ List<Exception> failedExceptions;
+ List<Integer> failedIndices;
public InsertPlan(Operator.OperatorType operatorType) {
super(false, operatorType);
@@ -74,10 +80,14 @@ public abstract class InsertPlan extends PhysicalPlan {
this.schemas = schemas;
}
- public Map<String, Exception> getFailedMeasurements() {
+ public List<String> getFailedMeasurements() {
return failedMeasurements;
}
+ public List<Exception> getFailedExceptions() {
+ return failedExceptions;
+ }
+
public int getFailedMeasurementNumber() {
return failedMeasurements == null ? 0 : failedMeasurements.size();
}
@@ -88,11 +98,38 @@ public abstract class InsertPlan extends PhysicalPlan {
*/
public void markFailedMeasurementInsertion(int index, Exception e) {
if (failedMeasurements == null) {
- failedMeasurements = new HashMap<>();
+ failedMeasurements = new ArrayList<>();
+ failedExceptions = new ArrayList<>();
+ failedIndices = new ArrayList<>();
}
- failedMeasurements.put(measurements[index], e);
+ failedMeasurements.add(measurements[index]);
+ failedExceptions.add(e);
+ failedIndices.add(index);
measurements[index] = null;
dataTypes[index] = null;
}
+ public InsertPlan transform() {
+ if (failedMeasurements == null) {
+ return null;
+ }
+ measurements = failedMeasurements.toArray(new String[0]);
+ failedMeasurements = null;
+ if(dataTypes != null){
+ TSDataType[] temp = dataTypes.clone();
+ dataTypes = new TSDataType[failedIndices.size()];
+ for(int i = 0; i < failedIndices.size(); i++){
+ dataTypes[i] = temp[failedIndices.get(i)];
+ }
+ }
+ if(schemas != null){
+ MeasurementSchema[] temp = schemas.clone();
+ schemas = new MeasurementSchema[failedIndices.size()];
+ for(int i = 0; i < failedIndices.size(); i++){
+ schemas[i] = temp[failedIndices.get(i)];
+ }
+ }
+ return this;
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 99b179e..82acd24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -57,6 +57,8 @@ public class InsertRowPlan extends InsertPlan {
// if values is object[], we could use the raw type of them, and we should set this to false
private boolean isNeedInferType = false;
+ private List<Object> failedValues;
+
public InsertRowPlan() {
super(OperatorType.INSERT);
}
@@ -190,6 +192,10 @@ public class InsertRowPlan extends InsertPlan {
@Override
public void markFailedMeasurementInsertion(int index, Exception e) {
super.markFailedMeasurementInsertion(index, e);
+ if (failedValues == null) {
+ failedValues = new ArrayList<>();
+ }
+ failedValues.add(values[index]);
values[index] = null;
}
@@ -452,4 +458,14 @@ public class InsertRowPlan extends InsertPlan {
System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length);
return new InsertRowPlan(deviceIdClone, timeClone, measurementsClone, typesClone, valuesClone);
}
+
+ @Override
+ public InsertPlan transform() {
+ if (super.transform() == null) {
+ return null;
+ }
+ values = failedValues.toArray(new Object[0]);
+ failedValues = null;
+ return this;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 3780365..da4347b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -62,6 +62,8 @@ public class InsertTabletPlan extends InsertPlan {
private int end;
private List<Integer> range;
+ private List<Object> failedColumns;
+
public InsertTabletPlan() {
super(OperatorType.BATCHINSERT);
@@ -501,9 +503,25 @@ public class InsertTabletPlan extends InsertPlan {
public void markFailedMeasurementInsertion(int index, Exception e) {
super.markFailedMeasurementInsertion(index, e);
+ if (failedColumns == null) {
+ failedColumns = new ArrayList<>();
+ }
+ failedColumns.add(columns[index]);
columns[index] = null;
}
+
+ @Override
+ public InsertPlan transform() {
+ if (super.transform() == null) {
+ return null;
+ }
+ // TODO anything else?
+ columns = failedColumns.toArray(new Object[0]);
+ failedColumns = null;
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -532,4 +550,5 @@ public class InsertTabletPlan extends InsertPlan {
result = 31 * result + Arrays.hashCode(times);
return result;
}
+
}