You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/28 03:51:57 UTC

[incubator-iotdb] branch master updated: [IOTDB-775] Add SQL of creating snapshot manually (#1413)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7895841  [IOTDB-775] Add SQL of creating snapshot manually (#1413)
7895841 is described below

commit 7895841a21fa7bbbbbc09c6de25bd04277b5aa13
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jun 28 11:51:45 2020 +0800

    [IOTDB-775] Add SQL of creating snapshot manually (#1413)
    
    * [IOTDB-775] Add SQL of creating snapshot manually
---
 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4     |  13 ++
 .../org/apache/iotdb/db/metadata/MManager.java     |   6 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   1 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   2 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  15 +-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 +-
 .../db/qp/logical/sys/CreateSnapshotOperator.java  |  30 ++++
 .../db/qp/physical/sys/CreateSnapshotPlan.java     |  38 +++++
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |  13 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   3 +
 .../db/integration/IoTDBCreateSnapshotIT.java      | 160 +++++++++++++++++++++
 11 files changed, 271 insertions(+), 12 deletions(-)

diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index 8170130..f78e854 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -80,6 +80,7 @@ statement
     | LOAD STRING_LITERAL autoCreateSchema? #loadFiles
     | REMOVE STRING_LITERAL #removeFile
     | MOVE STRING_LITERAL STRING_LITERAL #moveFile
+    | CREATE SNAPSHOT FOR SCHEMA #createSnapshot
     | SELECT INDEX func=ID //not support yet
     LR_BRACKET
     p1=fullPath COMMA p2=fullPath COMMA n1=timeValue COMMA n2=timeValue COMMA
@@ -883,6 +884,18 @@ LATEST
     : L A T E S T
     ;
 
+SNAPSHOT
+    : S N A P S H O T
+    ;
+
+FOR
+    : F O R
+    ;
+
+SCHEMA
+    : S C H E M A
+    ;
+
 //============================
 // End of the keywords list
 //============================
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 79f60c1..d247342 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1805,10 +1805,10 @@ public class MManager {
     if (System.currentTimeMillis() - logFile.lastModified() >= mtreeSnapshotThresholdTime
         && logWriter.getLineNumber() > lastSnapshotLogLineNumber) {
       logger.info("Start creating MTree snapshot, because {} ms elaspse.", System.currentTimeMillis() - logFile.lastModified());
-      createSnapshot();
+      createMTreeSnapshot();
     } else if (logWriter.getLineNumber() - lastSnapshotLogLineNumber >= mtreeSnapshotInterval) {
       logger.info("Start creating MTree snapshot, because of {} new lines are added.", logWriter.getLineNumber() - lastSnapshotLogLineNumber);
-      createSnapshot();
+      createMTreeSnapshot();
     } else {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -1819,7 +1819,7 @@ public class MManager {
     }
   }
 
-  private void createSnapshot() {
+  public void createMTreeSnapshot() {
     lock.readLock().lock();
     long time = System.currentTimeMillis();
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 3b31bbe..1a39a74 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -100,6 +100,7 @@ public class Planner {
       case CLEAR_CACHE:
       case NULL:
       case SHOW_MERGE_STATUS:
+      case CREATE_SCHEMA_SNAPSHOT:
         return operator;
       case QUERY:
       case UPDATE:
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 1acf1a7..33d0dbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -152,6 +152,8 @@ public class SQLConstant {
 
   public static final int TOK_SHOW_MERGE_STATUS = 87;
 
+  public static final int TOK_CREATE_SCHEMA_SNAPSHOT = 88;
+
   public static final Map<Integer, String> tokenSymbol = new HashMap<>();
   public static final Map<Integer, String> tokenNames = new HashMap<>();
   public static final Map<Integer, Integer> reverseWords = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 3cf570e..dd34f20 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -261,6 +261,9 @@ public class PlanExecutor implements IPlanExecutor {
       case CLEAR_CACHE:
         operateClearCache((ClearCachePlan) plan);
         return true;
+      case CREATE_SCHEMA_SNAPSHOT:
+        operateCreateSnapshot();
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorType()));
@@ -282,6 +285,10 @@ public class PlanExecutor implements IPlanExecutor {
     TimeSeriesMetadataCache.getInstance().clear();
   }
 
+  private void operateCreateSnapshot() {
+    mManager.createMTreeSnapshot();
+  }
+
   private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException {
     if (plan.getPaths() == null) {
       StorageEngine.getInstance().syncCloseAllProcessor();
@@ -1124,9 +1131,9 @@ public class PlanExecutor implements IPlanExecutor {
         if (measurementNode.getSchema().getType() != insertTabletPlan.getDataTypes()[i]) {
           if (!enablePartialInsert) {
             throw new QueryProcessException(String.format(
-              "Datatype mismatch, Insert measurement %s type %s, metadata tree type %s",
-              measurement, insertTabletPlan.getDataTypes()[i],
-              measurementNode.getSchema().getType()));
+                "Datatype mismatch, Insert measurement %s type %s, metadata tree type %s",
+                measurement, insertTabletPlan.getDataTypes()[i],
+                measurementNode.getSchema().getType()));
           } else {
             insertTabletPlan.markMeasurementInsertionFailed(i);
             continue;
@@ -1140,7 +1147,7 @@ public class PlanExecutor implements IPlanExecutor {
       StorageEngine.getInstance().insertTablet(insertTabletPlan);
       if (insertTabletPlan.getFailedMeasurements() != null) {
         throw new StorageEngineException(
-          "failed to insert measurements " + insertTabletPlan.getFailedMeasurements());
+            "failed to insert measurements " + insertTabletPlan.getFailedMeasurements());
       }
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 746326c..704288e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -76,6 +76,6 @@ public abstract class Operator {
     GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING,
     TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
     ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
-    SHOW_MERGE_STATUS
+    SHOW_MERGE_STATUS, CREATE_SCHEMA_SNAPSHOT
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateSnapshotOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateSnapshotOperator.java
new file mode 100644
index 0000000..c5c36ab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateSnapshotOperator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.qp.logical.RootOperator;
+
+public class CreateSnapshotOperator extends RootOperator {
+
+  public CreateSnapshotOperator(int tokenIntType) {
+    super(tokenIntType);
+    operatorType = OperatorType.CREATE_SCHEMA_SNAPSHOT;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateSnapshotPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateSnapshotPlan.java
new file mode 100644
index 0000000..93f8d5d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateSnapshotPlan.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class CreateSnapshotPlan extends PhysicalPlan {
+
+  public CreateSnapshotPlan() {
+    super(false, OperatorType.CREATE_SCHEMA_SNAPSHOT);
+  }
+
+  @Override
+  public List<Path> getPaths() {
+    return new ArrayList<>();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index ebd41f0..2c37e4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
 import org.apache.iotdb.db.qp.logical.sys.ClearCacheOperator;
 import org.apache.iotdb.db.qp.logical.sys.CountOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateSnapshotOperator;
 import org.apache.iotdb.db.qp.logical.sys.CreateTimeSeriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
@@ -78,6 +79,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ConstantContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountNodesContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountTimeseriesContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateRoleContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateSnapshotContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateTimeseriesContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateUserContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.DateExpressionContext;
@@ -1481,10 +1483,7 @@ public class LogicalGenerator extends SqlBaseBaseListener {
 
   /**
    * parse time expression, which is addition and subtraction expression of duration time, now() or
-   * DataTimeFormat time.
-   * <p>
-   * eg. now() + 1d - 2h
-   * </p>
+   * DataTimeFormat time. <p> eg. now() + 1d - 2h </p>
    */
   private Long parseDateExpression(DateExpressionContext ctx) {
     long time;
@@ -1622,4 +1621,10 @@ public class LogicalGenerator extends SqlBaseBaseListener {
     super.enterShowMergeStatus(ctx);
     initializedOperator = new ShowMergeStatusOperator(SQLConstant.TOK_SHOW_MERGE_STATUS);
   }
+
+  @Override
+  public void enterCreateSnapshot(CreateSnapshotContext ctx) {
+    super.enterCreateSnapshot(ctx);
+    initializedOperator = new CreateSnapshotOperator(SQLConstant.TOK_CREATE_SCHEMA_SNAPSHOT);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index d050e5d..76289ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateSnapshotPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
@@ -258,6 +259,8 @@ public class PhysicalGenerator {
         return new ClearCachePlan();
       case SHOW_MERGE_STATUS:
         return new ShowMergeStatusPlan();
+      case CREATE_SCHEMA_SNAPSHOT:
+        return new CreateSnapshotPlan();
       default:
         throw new LogicalOperatorException(operator.getType().toString(), "");
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateSnapshotIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateSnapshotIT.java
new file mode 100644
index 0000000..e49dcdd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateSnapshotIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class IoTDBCreateSnapshotIT {
+
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void createSnapshotTest() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      // test before creating snapshot
+      checkShowTimeseries(statement);
+
+      // create snapshot
+      statement.execute("CREATE SNAPSHOT FOR SCHEMA");
+      File snapshotFile = new File(config.getSchemaDir() + File.separator + "mtree.snapshot");
+
+      // test snapshot file exists
+      Assert.assertTrue(snapshotFile.exists());
+
+      // test snapshot content correct
+      String[] exp = new String[]{
+          "10",
+          "2,s0,,1,2,1,,-1,0",
+          "2,s1,,2,2,1,,-1,0",
+          "2,s2,,3,2,1,,-1,0",
+          "2,s3,,5,0,1,,-1,0",
+          "2,s4,,0,0,1,,-1,0",
+          "1,d0,9223372036854775807,5",
+          "2,s0,,1,2,1,,-1,0",
+          "2,s1,,5,0,1,,-1,0",
+          "2,s2,,0,0,1,,-1,0",
+          "1,d1,9223372036854775807,3",
+          "0,vehicle,2",
+          "0,root,1"
+      };
+
+      try (BufferedReader br = new BufferedReader(new FileReader(snapshotFile))) {
+        for (String line : exp) {
+          Assert.assertEquals(line, br.readLine());
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // test restart
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+      checkShowTimeseries(statement);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void prepareData() throws SQLException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      String[] creationSqls = new String[]{
+          "SET STORAGE GROUP TO root.vehicle.d0",
+          "SET STORAGE GROUP TO root.vehicle.d1",
+          "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+          "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+          "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d1.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+          "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN"
+      };
+
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void checkShowTimeseries(Statement statement) throws SQLException {
+    boolean hasResultSet = statement.execute("SHOW TIMESERIES");
+    assertTrue(hasResultSet);
+
+    try (ResultSet resultSet = statement.getResultSet()) {
+      int cnt = 0;
+      while (resultSet.next()) {
+        cnt++;
+      }
+      Assert.assertEquals(8, cnt);
+    }
+  }
+}