You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/06/19 02:51:13 UTC

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #1387: Create schema automatically

jt2594838 commented on a change in pull request #1387:
URL: https://github.com/apache/incubator-iotdb/pull/1387#discussion_r442598352



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1551,6 +1584,20 @@ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
         subStatus = forwardPlan(entry.getKey(), entry.getValue());
       }
       if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        if (entry.getKey() instanceof InsertPlan
+            && subStatus.getCode() == TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode()
+            && IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+          // try to create timeseries
+          boolean hasCreate = autoCreateTimeseries((InsertPlan) entry.getKey(), entry.getValue());

Review comment:
       "entry.getValue()" is the data group you are going to send the plan to, and it may not be the group that should hold the metadata, so create timeseries plans should be forward to the group that should hold the metadata.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1520,12 +1534,31 @@ private TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPla
     } catch (MetadataException e) {
       logger.error("Cannot route plan {}", plan, e);
     }
-    // the storage group is not found locally, forward it to the leader
+    // the storage group is not found locally
     if (planGroupMap == null || planGroupMap.isEmpty()) {
-      logger.debug("{}: Cannot found storage groups for {}", name, plan);
+      if (plan instanceof InsertPlan && IoTDBDescriptor.getInstance().getConfig()
+          .isAutoCreateSchemaEnabled()) {
+        // try to set storage group
+        String deviceId = ((InsertPlan) plan).getDeviceId();
+        try {
+          String storageGroupName = MetaUtils
+              .getStorageGroupNameByLevel(deviceId, IoTDBDescriptor.getInstance()
+                  .getConfig().getDefaultStorageGroupLevel());
+          SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(
+              new Path(storageGroupName));
+          TSStatus setStorageGroupResult = executeNonQuery(setStorageGroupPlan);
+          if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            throw new MetadataException("Failed to set storage group " + storageGroupName);
+          }
+          return executeNonQuery(plan);
+        } catch (MetadataException e) {
+          logger.info("Failed to set storage group of device id {}", deviceId);
+        }
+      }
+      logger.error("{}: Cannot found storage groups for {}", name, plan);
       return StatusUtils.NO_STORAGE_GROUP;
     }
-    logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
+    logger.error("{}: The data groups of {} are {}", name, plan, planGroupMap);

Review comment:
       This should still be debug.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1569,6 +1616,70 @@ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
     return status;
   }
 
+  /**
+   * Create timeseries automatically
+   * @param insertPlan, some of the timeseries in it are not created yet
+   * @param partitionGroup
+   * @return true of all uncreated timeseries are created
+   */
+  boolean autoCreateTimeseries(InsertPlan insertPlan, PartitionGroup partitionGroup) {
+    List<String> seriesList = new ArrayList<>();
+    String deviceId = insertPlan.getDeviceId();
+    for (String measurementId : insertPlan.getMeasurements()) {
+      seriesList.add(
+          new StringContainer(new String[]{deviceId, measurementId}, TsFileConstant.PATH_SEPARATOR)
+              .toString());
+    }
+    List<String> unregisteredSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup);
+    for (String seriesPath : unregisteredSeriesList) {
+      int index = seriesList.indexOf(seriesPath);
+      TSDataType dataType = TypeInferenceUtils
+          .getPredictedDataType(insertPlan.getValues()[index], true);
+      TSEncoding encoding = getDefaultEncoding(dataType);
+      CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
+      CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
+          dataType, encoding, compressionType, null, null, null, null);
+      TSStatus result = executeNonQuery(createTimeSeriesPlan);
+      if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("{} failed to execute create timeseries {}", thisNode, seriesPath);
+        return false;

Review comment:
       The schema may be auto-created concurrently, so I think the status code should be carefully checked, and if it says that the timeseries are already created, we should also return true.

##########
File path: service-rpc/src/main/thrift/cluster.thrift
##########
@@ -350,6 +350,8 @@ service TSDataService extends RaftService {
 
   list<binary> getAggrResult(1:GetAggrResultRequest request)
 
+  map<string, bool> isMeasurementsRegistered(1: Node header, 2: list<string> measurements)

Review comment:
       A suggestion about the interface design: I think the return type can be a list<bool> with the length of `measurements` because the sender clearly know what series it queries and the receiver need not send them back.
   
   Another thing, I think "measurement" is not proper here, better just call them timeseries.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1551,6 +1584,20 @@ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
         subStatus = forwardPlan(entry.getKey(), entry.getValue());
       }
       if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        if (entry.getKey() instanceof InsertPlan
+            && subStatus.getCode() == TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode()

Review comment:
       I think we should add a specific status code like "TSStatusCode.NO_TIMESERIES", as this one is too abstract and we are not sure whether it is because of no metadata.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1569,6 +1616,70 @@ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
     return status;
   }
 
+  /**
+   * Create timeseries automatically
+   * @param insertPlan, some of the timeseries in it are not created yet
+   * @param partitionGroup
+   * @return true of all uncreated timeseries are created
+   */
+  boolean autoCreateTimeseries(InsertPlan insertPlan, PartitionGroup partitionGroup) {
+    List<String> seriesList = new ArrayList<>();
+    String deviceId = insertPlan.getDeviceId();
+    for (String measurementId : insertPlan.getMeasurements()) {
+      seriesList.add(
+          new StringContainer(new String[]{deviceId, measurementId}, TsFileConstant.PATH_SEPARATOR)
+              .toString());
+    }
+    List<String> unregisteredSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup);
+    for (String seriesPath : unregisteredSeriesList) {
+      int index = seriesList.indexOf(seriesPath);
+      TSDataType dataType = TypeInferenceUtils
+          .getPredictedDataType(insertPlan.getValues()[index], true);
+      TSEncoding encoding = getDefaultEncoding(dataType);
+      CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
+      CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
+          dataType, encoding, compressionType, null, null, null, null);
+      TSStatus result = executeNonQuery(createTimeSeriesPlan);

Review comment:
       Please add a TODO here: "TODO-Cluster: add executeNonQueryBatch()", so we can cut the number of communications later.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1569,6 +1616,70 @@ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
     return status;
   }
 
+  /**
+   * Create timeseries automatically
+   * @param insertPlan, some of the timeseries in it are not created yet
+   * @param partitionGroup
+   * @return true of all uncreated timeseries are created
+   */
+  boolean autoCreateTimeseries(InsertPlan insertPlan, PartitionGroup partitionGroup) {
+    List<String> seriesList = new ArrayList<>();
+    String deviceId = insertPlan.getDeviceId();
+    for (String measurementId : insertPlan.getMeasurements()) {
+      seriesList.add(
+          new StringContainer(new String[]{deviceId, measurementId}, TsFileConstant.PATH_SEPARATOR)
+              .toString());
+    }
+    List<String> unregisteredSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup);
+    for (String seriesPath : unregisteredSeriesList) {
+      int index = seriesList.indexOf(seriesPath);
+      TSDataType dataType = TypeInferenceUtils
+          .getPredictedDataType(insertPlan.getValues()[index], true);
+      TSEncoding encoding = getDefaultEncoding(dataType);
+      CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
+      CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
+          dataType, encoding, compressionType, null, null, null, null);
+      TSStatus result = executeNonQuery(createTimeSeriesPlan);
+      if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("{} failed to execute create timeseries {}", thisNode, seriesPath);
+        return false;
+      }
+    }
+    return true;
+  }
+
+
+
+  /**
+   * To check which timeseries in the input list is unregistered
+   * @param seriesList
+   * @param partitionGroup
+   * @return
+   */
+  List<String> getUnregisteredSeriesList(List<String> seriesList, PartitionGroup partitionGroup) {
+    Set<String> unregistered = new HashSet<>();
+    for (Node node : partitionGroup) {
+      try {
+        DataClient client = getDataClient(node);
+        Map<String, Boolean> result = SyncClientAdaptor
+            .getUnregisteredMeasurements(client, partitionGroup.getHeader(), seriesList);
+        for (Map.Entry<String, Boolean> entry : result.entrySet()) {
+          if (!entry.getValue()) {
+            unregistered.add(entry.getKey());
+          }
+        }
+      } catch (TException | IOException e) {
+        logger.error("{}: cannot getting unregistered series list {} from {}", name,

Review comment:
       getting -> get
   I would suggest print something like `"{} and other {} paths", seriesList.get(0), seriesList.size() - 1` instead of `"{}", serieseList`, in case that the list is too long.
   And if you want to print a list in a log, you can just use the list as a parameter and there is no need to wrap it with "Arrays.toString(seriesList.toArray(new String[0]))".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org