You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/24 07:53:19 UTC

[iotdb] branch xianyi updated (ea222e2 -> d8a9170)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from ea222e2  add UDTFEn
     new 46d8f74  xianyi in p
     new 048b4ce  p selectinto
     new d8a9170  optimize performance

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/iotdb/SessionExample.java |  67 ++----
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 237 +++++++++++++++++++--
 2 files changed, 240 insertions(+), 64 deletions(-)

[iotdb] 02/03: p selectinto

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 048b4cea7d86990326ee2fc18eae047b0b5e3687
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Nov 24 11:34:59 2021 +0800

    p selectinto
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 77 ++++++++++++++++++++++
 1 file changed, 77 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b8a0a6b..9ca4fd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -132,6 +132,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
+import org.apache.iotdb.tsfile.read.filter.operator.Lt;
+import org.apache.iotdb.tsfile.read.filter.operator.LtEq;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
@@ -1011,6 +1019,75 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
 
     String prefix = statement.split("where")[0] + " where ";
 
+    IExpression iExpression = udtfPlan.getExpression();
+    if (iExpression instanceof GlobalTimeExpression) {
+      GlobalTimeExpression globalTimeExpression = (GlobalTimeExpression) iExpression;
+      if (globalTimeExpression.getFilter() instanceof AndFilter) {
+        AndFilter andFilter = (AndFilter) globalTimeExpression.getFilter();
+
+        if ((andFilter.getLeft() instanceof Lt || andFilter.getLeft() instanceof LtEq)
+            && (andFilter.getRight() instanceof Gt || andFilter.getRight() instanceof GtEq)) {
+          Filter filter = andFilter.getLeft();
+          andFilter.setLeft(andFilter.getRight());
+          andFilter.setRight(filter);
+        }
+
+        final long dayTimeDelta = 24 * 60 * 60 * 1000;
+
+        if (andFilter.getLeft() instanceof Gt && andFilter.getRight() instanceof Lt) {
+          long left = (Long) ((Gt) andFilter.getLeft()).getValue();
+          long right = (Long) ((Lt) andFilter.getRight()).getValue();
+          boolean first = true;
+
+          while (left < right) {
+            if (first) {
+              first = false;
+              statements.add(prefix + " time>" + left + " and time<" + (left + dayTimeDelta));
+            } else {
+              statements.add(prefix + " time>=" + left + " and time<" + (left + dayTimeDelta));
+            }
+            left += dayTimeDelta;
+          }
+        }
+
+        if (andFilter.getLeft() instanceof GtEq && andFilter.getRight() instanceof Lt) {
+          long left = (Long) ((GtEq) andFilter.getLeft()).getValue();
+          long right = (Long) ((Lt) andFilter.getRight()).getValue();
+
+          while (left < right) {
+            statements.add(prefix + " time>=" + left + " and time<" + (left + dayTimeDelta));
+            left += dayTimeDelta;
+          }
+        }
+
+        if (andFilter.getLeft() instanceof Gt && andFilter.getRight() instanceof LtEq) {
+          long left = (Long) ((Gt) andFilter.getLeft()).getValue();
+          long right = (Long) ((LtEq) andFilter.getRight()).getValue();
+
+          while (left <= right) {
+            statements.add(prefix + " time>" + left + " and time<=" + (left + dayTimeDelta));
+            left += dayTimeDelta;
+          }
+        }
+
+        if (andFilter.getLeft() instanceof GtEq && andFilter.getRight() instanceof LtEq) {
+          long left = (Long) ((GtEq) andFilter.getLeft()).getValue();
+          long right = (Long) ((LtEq) andFilter.getRight()).getValue();
+          boolean first = true;
+
+          while (left < right) {
+            if (first) {
+              first = false;
+              statements.add(prefix + " time>=" + left + " and time<=" + (left + dayTimeDelta));
+            } else {
+              statements.add(prefix + " time>" + left + " and time<=" + (left + dayTimeDelta));
+            }
+            left += dayTimeDelta;
+          }
+        }
+      }
+    }
+
     return statements;
   }
 

[iotdb] 03/03: optimize performance

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d8a91709305490925ed80562ee79a90f9910e8e9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Nov 24 15:52:05 2021 +0800

    optimize performance
---
 .../main/java/org/apache/iotdb/SessionExample.java | 67 +++++++---------------
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 59 +++++++++++++++++--
 2 files changed, 75 insertions(+), 51 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index cab736d..ba80fac 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -21,7 +21,6 @@ package org.apache.iotdb;
 
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.session.SessionDataSet.DataIterator;
@@ -62,18 +61,18 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    try {
-      session.setStorageGroup("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
-        throw e;
-      }
-    }
+    //    try {
+    //      session.setStorageGroup("root.sg1");
+    //    } catch (StatementExecutionException e) {
+    //      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+    //        throw e;
+    //      }
+    //    }
 
     // createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
+    //    createTimeseries();
+    //    createMultiTimeseries();
+    //    insertRecord();
     insertTablet();
     //    insertTabletWithNullValues();
     //    insertTablets();
@@ -90,16 +89,16 @@ public class SessionExample {
     //    deleteTimeseries();
     //    setTimeout();
 
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+    //    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+    //    sessionEnableRedirect.setEnableQueryRedirection(true);
+    //    sessionEnableRedirect.open(false);
+    //
+    //    // set session fetchSize
+    //    sessionEnableRedirect.setFetchSize(10000);
+    //
+    //    insertRecord4Redirect();
+    //    query4Redirect();
+    //    sessionEnableRedirect.close();
     session.close();
   }
 
@@ -393,7 +392,7 @@ public class SessionExample {
     // Method 1 to add tablet data
     long timestamp = System.currentTimeMillis();
 
-    for (long row = 0; row < 100; row++) {
+    for (long row = 0; row < 1000; row++) {
       int rowIndex = tablet.rowSize++;
       tablet.addTimestamp(rowIndex, timestamp);
       for (int s = 0; s < 3; s++) {
@@ -404,29 +403,7 @@ public class SessionExample {
         session.insertTablet(tablet, true);
         tablet.reset();
       }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
-
-    // Method 2 to add tablet data
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-
-    for (long time = 0; time < 100; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-      for (int i = 0; i < 3; i++) {
-        long[] sensor = (long[]) values[i];
-        sensor[row] = i;
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
-      }
+      timestamp += 3600 * 1000;
     }
 
     if (tablet.rowSize != 0) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 9ca4fd0..93fe754 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -950,12 +950,7 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
     final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
     final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
 
-    if (queryPlan instanceof UDTFPlan
-        && queryPlan
-            .getResultColumns()
-            .get(0)
-            .getExpression()
-            .isTimeSeriesGeneratingFunctionExpression()
+    if (shouldSplit(queryPlan)
     //        && ((FunctionExpression) queryPlan.getResultColumns().get(0).getExpression())
     //            .getFunctionName()
     //            .equalsIgnoreCase("en")
@@ -991,6 +986,58 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
     }
   }
 
+  boolean shouldSplit(QueryPlan queryPlan) {
+    if (!(queryPlan instanceof UDTFPlan)) {
+      return false;
+    }
+
+    if (!queryPlan
+        .getResultColumns()
+        .get(0)
+        .getExpression()
+        .isTimeSeriesGeneratingFunctionExpression()) {
+      return false;
+    }
+
+    UDTFPlan udtfPlan = (UDTFPlan) queryPlan;
+    IExpression iExpression = udtfPlan.getExpression();
+    if (iExpression instanceof GlobalTimeExpression) {
+      GlobalTimeExpression globalTimeExpression = (GlobalTimeExpression) iExpression;
+      if (globalTimeExpression.getFilter() instanceof AndFilter) {
+        AndFilter andFilter = (AndFilter) globalTimeExpression.getFilter();
+
+        if ((andFilter.getLeft() instanceof Lt || andFilter.getLeft() instanceof LtEq)
+            && (andFilter.getRight() instanceof Gt || andFilter.getRight() instanceof GtEq)) {
+          Filter filter = andFilter.getLeft();
+          andFilter.setLeft(andFilter.getRight());
+          andFilter.setRight(filter);
+        }
+
+        final long dayTimeDelta = 24 * 60 * 60 * 1000;
+
+        long left = 0, right = 0;
+        if (andFilter.getLeft() instanceof Gt) {
+          left = (long) ((Gt) andFilter.getLeft()).getValue();
+        }
+        if (andFilter.getLeft() instanceof GtEq) {
+          left = (long) ((GtEq) andFilter.getLeft()).getValue();
+        }
+        if (andFilter.getRight() instanceof Lt) {
+          right = (long) ((Lt) andFilter.getRight()).getValue();
+        }
+        if (andFilter.getRight() instanceof LtEq) {
+          right = (long) ((LtEq) andFilter.getRight()).getValue();
+        }
+
+        if (right - left > dayTimeDelta) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
   private TSExecuteStatementResp executeSelectIntoStatementXianyi(
       UDTFPlan udtfPlan,
       TSServiceImpl tsService,

[iotdb] 01/03: xianyi in p

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 46d8f741e1eeab9b11b53e2e16c192490ed2ed13
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Nov 24 11:20:49 2021 +0800

    xianyi in p
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 113 +++++++++++++++++----
 1 file changed, 94 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 505c5a9..b8a0a6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
@@ -133,11 +134,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-import com.google.common.primitives.Bytes;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
@@ -151,8 +147,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
 import java.util.stream.Collectors;
 
+import com.google.common.primitives.Bytes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
@@ -935,22 +939,29 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       return new TSExecuteStatementResp(status);
     }
 
-    final long startTime = System.currentTimeMillis();
-    final long queryId = sessionManager.requestQueryId(statementId, true);
-    QueryContext context =
-        genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout);
     final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan;
     final QueryPlan queryPlan = selectIntoPlan.getQueryPlan();
 
-    queryFrequencyRecorder.incrementAndGet();
-    AUDIT_LOGGER.debug(
-        "Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement);
-    if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) {
-      tracingManager.setSeriesPathNum(queryId, queryPlan.getPaths().size());
+    if (queryPlan instanceof UDTFPlan
+        && queryPlan
+            .getResultColumns()
+            .get(0)
+            .getExpression()
+            .isTimeSeriesGeneratingFunctionExpression()
+    //        && ((FunctionExpression) queryPlan.getResultColumns().get(0).getExpression())
+    //            .getFunctionName()
+    //            .equalsIgnoreCase("en")
+    ) {
+      return executeSelectIntoStatementXianyi(
+          (UDTFPlan) queryPlan, this, statement, statementId, timeout, fetchSize, sessionId);
     }
 
-    try {
+    final long startTime = System.currentTimeMillis();
+    final long queryId = sessionManager.requestQueryId(statementId, true);
+    QueryContext context =
+        genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout);
 
+    try {
       InsertTabletPlansIterator insertTabletPlansIterator =
           new InsertTabletPlansIterator(
               queryPlan,
@@ -969,10 +980,74 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
       return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS).setQueryId(queryId);
     } finally {
       sessionManager.releaseQueryResourceNoExceptions(queryId);
-      Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime);
-      long costTime = System.currentTimeMillis() - startTime;
-      if (costTime >= CONFIG.getSlowQueryThreshold()) {
-        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
+    }
+  }
+
+  private TSExecuteStatementResp executeSelectIntoStatementXianyi(
+      UDTFPlan udtfPlan,
+      TSServiceImpl tsService,
+      String statement,
+      long statementId,
+      long timeout,
+      int fetchSize,
+      long sessionId) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() << 1);
+    List<ForkJoinTask<Void>> futures = new ArrayList<>();
+    for (String subStatement : split(udtfPlan, statement)) {
+      futures.add(
+          forkJoinPool.submit(
+              new InsertTabletPlanTask(
+                  tsService, statementId, timeout, fetchSize, sessionId, subStatement)));
+    }
+    for (ForkJoinTask<Void> v : futures) {
+      v.join();
+    }
+    forkJoinPool.shutdown();
+    return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  private List<String> split(UDTFPlan udtfPlan, String statement) {
+    List<String> statements = new ArrayList<>();
+
+    String prefix = statement.split("where")[0] + " where ";
+
+    return statements;
+  }
+
+  private class InsertTabletPlanTask extends RecursiveTask<Void> {
+
+    private final TSServiceImpl tsService;
+    private final long statementId;
+    private final long timeout;
+    private final int fetchSize;
+    private final long sessionId;
+    private final String statement;
+
+    InsertTabletPlanTask(
+        TSServiceImpl tsService,
+        long statementId,
+        long timeout,
+        int fetchSize,
+        long sessionId,
+        String statement) {
+      this.tsService = tsService;
+      this.statementId = statementId;
+      this.timeout = timeout;
+      this.fetchSize = fetchSize;
+      this.sessionId = sessionId;
+      this.statement = statement;
+    }
+
+    @Override
+    protected Void compute() {
+      try {
+        PhysicalPlan physicalPlan =
+            processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(sessionId));
+        tsService.executeSelectIntoStatement(
+            statement, statementId, physicalPlan, fetchSize, timeout, sessionId);
+        return null;
+      } catch (Exception e) {
+        throw new RuntimeException(e.getMessage());
       }
     }
   }