You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/01/14 08:41:22 UTC

[incubator-iotdb] branch DisableAlign created (now 3ee98df)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 3ee98df  Disable align

This branch includes the following new commits:

     new be4b8af  s
     new a52c63d  s
     new 8f8d570  complete
     new 3ee98df  Disable align

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/04: s

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit be4b8af0d06f2cd58c162b3f147e9f9f0237309a
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 15:56:03 2020 +0800

    s
---
 .../main/java/org/apache/iotdb/JDBCExample.java    |  47 +----
 .../db/query/dataset/NonAlignEngineDataSet.java    | 226 ++++++++++++---------
 2 files changed, 137 insertions(+), 136 deletions(-)

diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index 99a6193..cffdd32 100644
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.iotdb;
 
 import java.sql.Connection;
@@ -31,23 +13,11 @@ public class JDBCExample {
     Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
     try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
          Statement statement = connection.createStatement()) {
-      statement.execute("SET STORAGE GROUP TO root.sg1");
-      statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE");
-      statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE");
-      statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE");
-
-      for (int i = 0; i <= 100; i++) {
-        statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")");
-      }
-      statement.executeBatch();
-      statement.clearBatch();
-
-      ResultSet resultSet = statement.executeQuery("select * from root where time <= 10");
-      outputResult(resultSet);
-      resultSet = statement.executeQuery("select count(*) from root");
-      outputResult(resultSet);
-      resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)");
+      long startTime = System.currentTimeMillis();
+      ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000");
       outputResult(resultSet);
+      long endTime = System.currentTimeMillis();
+      System.out.println("Cost Time: " + (endTime - startTime));
     }
   }
 
@@ -61,15 +31,6 @@ public class JDBCExample {
       }
       System.out.println();
       while (resultSet.next()) {
-        for (int i = 1; ; i++) {
-          System.out.print(resultSet.getString(i));
-          if (i < columnCount) {
-            System.out.print(", ");
-          } else {
-            System.out.println();
-            break;
-          }
-        }
       }
       System.out.println("--------------------------\n");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index 1474bba..c2e72e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -19,13 +19,6 @@
 
 package org.apache.iotdb.db.query.dataset;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
@@ -42,28 +35,33 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 public class NonAlignEngineDataSet extends QueryDataSet {
-  
+
   private static class ReadTask implements Runnable {
-    
+
     private final ManagedSeriesReader reader;
-    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue;
     private WatermarkEncoder encoder;
-    private int rowOffset;
-    private int fetchSize;
-    private int rowLimit;
-    private int alreadyReturnedRowNum = 0;
-    
-    public ReadTask(ManagedSeriesReader reader, 
-        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
-        WatermarkEncoder encoder,
-        int rowOffset, int rowLimit, int fetchSize) {
+    NonAlignEngineDataSet dataSet;
+    private int index;
+
+
+    public ReadTask(ManagedSeriesReader reader,
+                    BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue,
+                    WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.encoder = encoder;
-      this.rowOffset = rowOffset;
-      this.rowLimit = rowLimit;
-      this.fetchSize = fetchSize;
+      this.dataSet = dataSet;
+      this.index = index;
     }
 
     @Override
@@ -75,18 +73,23 @@ public class NonAlignEngineDataSet extends QueryDataSet {
           // if the task is submitted, there must be free space in the queue
           // so here we don't need to check whether the queue has free space
           // the reader has next batch
-          if (reader.hasNextBatch()) {
-            BatchData batchData = reader.nextBatch();
-            
+          if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+                  || reader.hasNextBatch()) {
+            BatchData batchData;
+            if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+              batchData = dataSet.cachedBatchData[index];
+            else
+              batchData = reader.nextBatch();
+
             int rowCount = 0;
-            while (rowCount < fetchSize) {
-              
-              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+            while (rowCount < dataSet.fetchSize) {
+
+              if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) {
                 break;
               }
-              
+
               if (batchData != null && batchData.hasCurrent()) {
-                if (rowOffset == 0) {
+                if (dataSet.offsetArray[index] == 0) {
                   long time = batchData.currentTime();
                   ReadWriteIOUtils.write(time, timeBAOS);
                   TSDataType type = batchData.getDataType();
@@ -136,20 +139,42 @@ public class NonAlignEngineDataSet extends QueryDataSet {
                 batchData.next();
               }
               else {
-                break;
+                if (reader.hasNextBatch()) {
+                  batchData = reader.nextBatch();
+                  dataSet.cachedBatchData[index] = batchData;
+                  continue;
+                }
+                else
+                  break;
               }
-              if (rowOffset == 0) {
+              if (dataSet.offsetArray[index] == 0) {
                 rowCount++;
-                if (rowLimit > 0) {
-                  alreadyReturnedRowNum++;
+                if (dataSet.limit > 0) {
+                  dataSet.alreadyReturnedRowNumArray[index]++;
                 }
               } else {
-                rowOffset--;
+                dataSet.offsetArray[index]--;
               }
             }
-            
-            Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
-            
+            if (rowCount == 0) {
+              blockingQueue.put(new Pair(null, null));
+              // set the hasRemaining field in reader to false
+              // tell the Consumer not to submit another task for this reader any more
+              reader.setHasRemaining(false);
+              // remove itself from the QueryTaskPoolManager
+              reader.setManagedByQueryManager(false);
+              return;
+            }
+
+            ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
+            timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
+            timeBuffer.flip();
+            ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size());
+            valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size());
+            valueBuffer.flip();
+
+            Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer);
+
             blockingQueue.put(timeValueBAOSPair);
             // if the queue also has free space, just submit another itself
             if (blockingQueue.remainingCapacity() > 0) {
@@ -162,7 +187,7 @@ public class NonAlignEngineDataSet extends QueryDataSet {
             }
             return;
           }
-          blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+          blockingQueue.put(new Pair(null, null));
           // set the hasRemaining field in reader to false
           // tell the Consumer not to submit another task for this reader any more
           reader.setHasRemaining(false);
@@ -176,19 +201,37 @@ public class NonAlignEngineDataSet extends QueryDataSet {
       } catch (Exception e) {
         LOGGER.error("Something gets wrong: ", e);
       }
-      
+
     }
-    
+
   }
 
-  
+
   private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
-  
+
   // Blocking queue list for each time value buffer pair
-  private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
-  
+  private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray;
+
   private boolean initialized = false;
-  
+
+  private int[] offsetArray;
+
+  private int limit;
+
+  private int[] alreadyReturnedRowNumArray;
+
+  private BatchData[] cachedBatchData;
+
+  // indicate that there is no more batch data in the corresponding queue
+  // in case that the consumer thread is blocked on the queue and won't get runnable any more
+  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+  // noMoreDataInQueue can still be true
+  // its usage is to tell the consumer thread not to call the take() method.
+  private boolean[] noMoreDataInQueueArray;
+
+  private int fetchSize;
+
   // indicate that there is no more batch data in the corresponding queue
   // in case that the consumer thread is blocked on the queue and won't get runnable any more
   // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
@@ -202,7 +245,7 @@ public class NonAlignEngineDataSet extends QueryDataSet {
   private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
-  
+
   /**
    * constructor of EngineDataSet.
    *
@@ -211,102 +254,99 @@ public class NonAlignEngineDataSet extends QueryDataSet {
    * @param readers readers in List(IPointReader) structure
    */
   public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers) throws InterruptedException {
+                               List<ManagedSeriesReader> readers) {
     super(paths, dataTypes);
     this.seriesReaderWithoutValueFilterList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
+    noMoreDataInQueueArray = new boolean[readers.size()];
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
       blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
     }
   }
-  
-  private void init(WatermarkEncoder encoder, int fetchSize) 
-      throws InterruptedException {
+
+  private void initLimit(int offset, int limit, int size) {
+    offsetArray = new int[size];
+    Arrays.fill(offsetArray, offset);
+    this.limit = limit;
+    alreadyReturnedRowNumArray = new int[size];
+    cachedBatchData = new BatchData[size];
+  }
+
+  private void init(WatermarkEncoder encoder, int fetchSize) {
+    initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
+    this.fetchSize = fetchSize;
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
       ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
-      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize));
+      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i));
     }
     this.initialized = true;
   }
-  
+
   /**
    * for RPC in RawData query between client and server
    * fill time buffers and value buffers
    */
-  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException {
     if (!initialized) {
       init(encoder, fetchSize);
     }
     int seriesNum = seriesReaderWithoutValueFilterList.size();
     TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
 
-    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
-    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
-    
+    List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum);
+    List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum);
+
     for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      if (!noMoreDataInQueueArray[seriesIndex]) {
+        Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take();
+        if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
+          noMoreDataInQueueArray[seriesIndex] = true;
+          timeValueByteBufferPair.left = ByteBuffer.allocate(0);
+          timeValueByteBufferPair.right = ByteBuffer.allocate(0);
+        }
+        timeBufferList.add(timeValueByteBufferPair.left);
+        valueBufferList.add(timeValueByteBufferPair.right);
+      }
+      else {
+        timeBufferList.add(ByteBuffer.allocate(0));
+        valueBufferList.add(ByteBuffer.allocate(0));
+        continue;
+      }
+
       synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
         if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
           ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
           // if the reader isn't being managed and still has more data,
           // that means this read task leave the pool before because the queue has no more space
           // now we should submit it again
-          if (reader.isManagedByQueryManager() && reader.hasRemaining()) {
+          if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
             reader.setManagedByQueryManager(true);
-            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], 
-                encoder, rowOffset, rowLimit, fetchSize));
+            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
+                    encoder, this, seriesIndex));
           }
         }
-        Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll();
-        if (timevalueBAOSPair != null) {
-          timeBAOSList[seriesIndex] = timevalueBAOSPair.left;
-          valueBAOSList[seriesIndex] = timevalueBAOSPair.right;
-        }
-        else {
-          timeBAOSList[seriesIndex] = new PublicBAOS();
-          valueBAOSList[seriesIndex] = new PublicBAOS();
-        }
       }
     }
 
-    List<ByteBuffer> timeBufferList = new ArrayList<>();
-    List<ByteBuffer> valueBufferList = new ArrayList<>();
-
-    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
-      // add time buffer of current series
-      putPBOSToBuffer(timeBAOSList, timeBufferList, seriesIndex);
-
-      // add value buffer of current series
-      putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex);
-    }
-
     // set time buffers, value buffers and bitmap buffers
     tsQueryNonAlignDataSet.setTimeList(timeBufferList);
     tsQueryNonAlignDataSet.setValueList(valueBufferList);
 
     return tsQueryNonAlignDataSet;
   }
-  
-  private void putPBOSToBuffer(PublicBAOS[] aBAOSList, List<ByteBuffer> aBufferList,
-      int tsIndex) {
-    ByteBuffer aBuffer = ByteBuffer.allocate(aBAOSList[tsIndex].size());
-    aBuffer.put(aBAOSList[tsIndex].getBuf(), 0, aBAOSList[tsIndex].size());
-    aBuffer.flip();
-    aBufferList.add(aBuffer);
-  }
+
 
   @Override
-  protected boolean hasNextWithoutConstraint() throws IOException {
-    // TODO Auto-generated method stub
+  protected boolean hasNextWithoutConstraint() {
     return false;
   }
 
   @Override
-  protected RowRecord nextWithoutConstraint() throws IOException {
-    // TODO Auto-generated method stub
+  protected RowRecord nextWithoutConstraint() {
     return null;
   }
-  
+
 
 }


[incubator-iotdb] 04/04: Disable align

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3ee98df2f6659a55dd407a1e28d1ebadddf304b5
Merge: 8f8d570 7c0f1d5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 16:40:41 2020 +0800

    Disable align

 NOTICE                                             |   2 +-
 NOTICE-binary                                      |   2 +-
 RELEASE_NOTES.md                                   |  28 +
 client-py/compile.bat                              |   5 +-
 client-py/readme.md                                |   2 +-
 client-py/src/client_example.py                    |   2 +
 .../org/apache/iotdb/client/AbstractClient.java    |   2 +-
 .../UserGuide/3-Server/4-Config Manual.md          |  37 +-
 .../5-Operation Manual/4-SQL Reference.md          |  13 +-
 .../UserGuide/3-Server/4-Config Manual.md          |  39 +-
 .../4-Client/4-Programming - Other Languages.md    |  53 +-
 .../5-Operation Manual/4-SQL Reference.md          |  13 +-
 .../8-System Design (Developer)/1-Hierarchy.md     |   6 +-
 .../main/java/org/apache/iotdb/JDBCExample.java    |   2 +-
 server/pom.xml                                     |  12 +
 .../resources/conf/iotdb-engine.properties         |  15 +-
 server/src/assembly/resources/conf/iotdb-env.bat   |  31 +-
 server/src/assembly/resources/conf/iotdb-env.sh    |  31 +-
 server/src/assembly/resources/conf/logback.xml     |  12 +-
 .../src/assembly/resources/sbin/start-server.bat   |   6 +-
 server/src/assembly/resources/sbin/start-server.sh |   6 +-
 .../tsfileToolSet/print-tsfile-resource-files.sh   |   0
 .../tools/tsfileToolSet/print-tsfile-sketch.sh     |   0
 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4     |   6 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 102 ++-
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |  48 +-
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   7 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  39 +-
 .../db/conf/adapter/IoTDBConfigDynamicAdapter.java |   5 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  39 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  24 +-
 .../iotdb/db/engine/flush/TsFileFlushPolicy.java   |   2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   8 +-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   5 +-
 .../db/engine/memtable/IWritableMemChunk.java      |  16 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  80 +-
 .../db/engine/merge/manage/MergeResource.java      |   3 +-
 .../db/engine/modification/ModificationFile.java   |   2 +-
 .../io/LocalTextModificationAccessor.java          |   2 +-
 .../engine/modification/io/ModificationWriter.java |   1 +
 .../engine/storagegroup/StorageGroupProcessor.java | 813 ++++++++++++++-------
 .../db/engine/storagegroup/TsFileProcessor.java    |  85 +--
 .../db/engine/storagegroup/TsFileResource.java     |   7 +-
 .../version/SimpleFileVersionController.java       |  51 +-
 .../java/org/apache/iotdb/db/metadata/MGraph.java  |   2 +-
 .../java/org/apache/iotdb/db/metadata/MTree.java   |   2 +-
 .../apache/iotdb/db/qp/constant/DatetimeUtils.java |   2 +-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |  52 +-
 .../iotdb/db/qp/physical/crud/BatchInsertPlan.java |  56 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  34 +-
 .../iotdb/db/query/context/QueryContext.java       |  19 +-
 .../db/query/dataset/NonAlignEngineDataSet.java    |  22 +-
 .../iotdb/db/query/executor/EngineExecutor.java    |   8 +-
 .../org/apache/iotdb/db/rescon/MemTablePool.java   |  24 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   6 +-
 .../org/apache/iotdb/db/service/JDBCService.java   |   9 +-
 .../apache/iotdb/db/service/MetricsService.java    |  62 +-
 .../apache/iotdb/db/service/RegisterManager.java   |   3 +
 .../iotdb/db/sync/conf/SyncSenderConfig.java       |  18 -
 .../iotdb/db/sync/receiver/SyncServerManager.java  |   9 +-
 .../iotdb/db/sync/receiver/load/FileLoader.java    |   2 +-
 .../db/sync/receiver/load/FileLoaderManager.java   |   9 +-
 .../db/sync/sender/manage/ISyncFileManager.java    |  10 +-
 .../db/sync/sender/manage/SyncFileManager.java     | 137 ++--
 .../iotdb/db/sync/sender/transfer/ISyncClient.java |   6 +-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 120 +--
 .../FileUtils.java}                                |  46 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |  10 +-
 .../iotdb/db/utils/datastructure/BinaryTVList.java |  30 +
 .../db/utils/datastructure/BooleanTVList.java      |  30 +
 .../iotdb/db/utils/datastructure/DoubleTVList.java |  30 +
 .../iotdb/db/utils/datastructure/FloatTVList.java  |  30 +
 .../iotdb/db/utils/datastructure/IntTVList.java    |  30 +
 .../iotdb/db/utils/datastructure/LongTVList.java   |  30 +
 .../iotdb/db/utils/datastructure/TVList.java       |  30 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  13 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |   2 +-
 .../db/conf/adapter/CompressionRatioTest.java      |   6 -
 .../adapter/IoTDBConfigDynamicAdapterTest.java     |   9 +-
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |   5 +-
 .../iotdb/db/engine/merge/MergeTaskTest.java       |   9 +-
 .../apache/iotdb/db/engine/merge/MergeTest.java    |  10 +-
 .../engine/modification/DeletionFileNodeTest.java  |  56 +-
 .../engine/modification/ModificationFileTest.java  |   8 +-
 .../io/LocalTextModificationAccessorTest.java      |   4 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  14 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |  61 +-
 .../engine/storagegroup/TsFileProcessorTest.java   |   8 +-
 .../version/SimpleFileVersionControllerTest.java   |   4 +-
 .../iotdb/db/integration/IOTDBGroupByIT.java       |  29 +-
 .../integration/IOTDBGroupByInnerIntervalIT.java   |   6 -
 .../iotdb/db/integration/IoTDBAggregationIT.java   |  42 +-
 .../integration/IoTDBAggregationLargeDataIT.java   |   8 +-
 .../integration/IoTDBAggregationSmallDataIT.java   |   6 -
 .../iotdb/db/integration/IoTDBAuthorizationIT.java |  18 +-
 .../db/integration/IoTDBAutoCreateSchemaIT.java    |   6 -
 .../apache/iotdb/db/integration/IoTDBCloseIT.java  |   6 -
 .../iotdb/db/integration/IoTDBCompleteIT.java      |   6 -
 .../apache/iotdb/db/integration/IoTDBDaemonIT.java |   6 +-
 .../db/integration/IoTDBDeleteStorageGroupIT.java  |   6 -
 .../iotdb/db/integration/IoTDBDeletionIT.java      |  16 +-
 .../db/integration/IoTDBEngineTimeGeneratorIT.java |   5 -
 .../apache/iotdb/db/integration/IoTDBFillIT.java   |   6 -
 .../db/integration/IoTDBFloatPrecisionIT.java      |   6 -
 .../db/integration/IoTDBFlushQueryMergeTest.java   |  13 +-
 .../iotdb/db/integration/IoTDBGroupbyDeviceIT.java |   5 -
 .../iotdb/db/integration/IoTDBLargeDataIT.java     |   5 -
 .../iotdb/db/integration/IoTDBLimitSlimitIT.java   |   5 -
 .../integration/IoTDBLoadExternalTsfileTest.java   |  56 +-
 .../iotdb/db/integration/IoTDBMergeTest.java       |   6 -
 .../iotdb/db/integration/IoTDBMetadataFetchIT.java |   6 -
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |  50 +-
 .../db/integration/IoTDBMultiStatementsIT.java     |   6 -
 .../iotdb/db/integration/IoTDBNumberPathIT.java    |   6 -
 .../iotdb/db/integration/IoTDBQueryDemoIT.java     |   4 -
 .../iotdb/db/integration/IoTDBQuotedPathIT.java    |   5 -
 ...IoTDBAggregationIT.java => IoTDBRecoverIT.java} | 317 ++------
 .../db/integration/IoTDBSequenceDataQueryIT.java   |   5 -
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  |   5 -
 .../iotdb/db/integration/IoTDBSimpleQueryTest.java |   5 -
 .../iotdb/db/integration/IoTDBTimeZoneIT.java      |   5 -
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    |   6 -
 .../iotdb/db/integration/IoTDBVersionIT.java       |   6 -
 .../apache/iotdb/db/qp/plan/PhysicalPlanTest.java  |  59 +-
 .../fileRelated/UnSealedTsFileReaderTest.java      |  13 +-
 .../NewUnseqResourceMergeReaderTest.java           |  13 +-
 .../resourceRelated/SeqResourceReaderTest.java     |   7 +-
 .../resourceRelated/UnseqResourceReaderTest.java   |   5 +-
 .../db/sync/receiver/load/FileLoaderTest.java      |  55 +-
 .../recover/SyncReceiverLogAnalyzerTest.java       |   7 +-
 .../db/sync/sender/manage/SyncFileManagerTest.java | 186 +++--
 .../sender/recover/SyncSenderLogAnalyzerTest.java  |  64 +-
 .../apache/iotdb/db/tools/IoTDBWatermarkTest.java  |  16 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  59 +-
 .../iotdb/db/writelog/IoTDBLogFileSizeTest.java    |   7 -
 .../apache/iotdb/db/writelog/PerformanceTest.java  |   2 +
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |   2 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |   3 +-
 server/src/test/resources/logback.xml              |   2 +-
 service-rpc/src/pypi/setup.py                      |   2 +-
 session/pom.xml                                    |   9 +-
 .../java/org/apache/iotdb/session/Session.java     | 129 +++-
 .../org/apache/iotdb/session/IoTDBSessionIT.java   | 335 ++++++++-
 .../iotdb/session/utils/EnvironmentUtils.java      | 190 -----
 .../iotdb/tsfile/file/metadata/ChunkMetaData.java  |   3 +-
 .../tsfile/fileSystem/fsFactory/FSFactory.java     |  67 ++
 .../tsfile/fileSystem/fsFactory/HDFSFactory.java   |  15 +
 .../fileSystem/fsFactory/LocalFSFactory.java       |   9 +
 148 files changed, 2871 insertions(+), 1827 deletions(-)

diff --cc client/src/main/java/org/apache/iotdb/client/AbstractClient.java
index 0531ba0,23c2440..a5a85eb
--- a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
@@@ -527,33 -485,16 +527,33 @@@ public abstract class AbstractClient 
          }
          maxValueLength = tmp;
        }
 -      for (int i = 2; i <= colCount; i++) {
 -        if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
 -          blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+");
 -        } else {
 -          blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
 +      if (printTimestamp) {
 +        for (int i = 2; i <= colCount; i++) {
 +          if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
 +            blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+");
 +          } else {
 +            blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
 +          }
 +        }
 +      } else {
 +        for (int i = 1; i <= colCount; i++) {
 +            blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
          }
        }
 -    } else {
 -      blockLine.append("+");
 +    }
 +    // for disable align clause
 +    else {
 +      int tmp = Integer.MIN_VALUE;
        for (int i = 1; i <= colCount; i++) {
 +        int len = resultSetMetaData.getColumnLabel(i).length();
 +        tmp = Math.max(tmp, len);
 +      }
 +      maxValueLength = tmp;
 +      blockLine.append("+");
 +      for (int i = 2; i <= colCount / 2 + 1; i++) {
 +        if (printTimestamp) {
 +          blockLine.append(StringUtils.repeat('-', maxTimeLength)).append("+");
-         } 
++        }
          blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
        }
      }
diff --cc example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index cffdd32,99a6193..8b4bc7b
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@@ -13,11 -31,23 +13,11 @@@ public class JDBCExample 
      Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
      try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
           Statement statement = connection.createStatement()) {
 -      statement.execute("SET STORAGE GROUP TO root.sg1");
 -      statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE");
 -      statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE");
 -      statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE");
 -
 -      for (int i = 0; i <= 100; i++) {
 -        statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")");
 -      }
 -      statement.executeBatch();
 -      statement.clearBatch();
 -
 -      ResultSet resultSet = statement.executeQuery("select * from root where time <= 10");
 -      outputResult(resultSet);
 -      resultSet = statement.executeQuery("select count(*) from root");
 -      outputResult(resultSet);
 -      resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)");
 +      long startTime = System.currentTimeMillis();
-       ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000");
++      ResultSet resultSet = statement.executeQuery("select * from root where time < 100000000 disable align");
        outputResult(resultSet);
 +      long endTime = System.currentTimeMillis();
 +      System.out.println("Cost Time: " + (endTime - startTime));
      }
    }
  
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 09c648f,479849a..e4e8e31
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@@ -18,30 -18,31 +18,27 @@@
   */
  package org.apache.iotdb.db.conf;
  
--import java.io.File;
--import java.io.FileInputStream;
--import java.io.FileOutputStream;
--import java.io.IOException;
--import java.io.InputStreamReader;
--import java.util.Properties;
--
  import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
  import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import java.io.*;
++import java.util.Properties;
++
  public class IoTDBConfigCheck {
  
    // this file is located in data/system/schema/system_properties.
    // If user delete folder "data", system_properties can reset.
    public static final String PROPERTIES_FILE_NAME = "system.properties";
    public static final String SCHEMA_DIR =
--      IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
++          IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
    private static final IoTDBConfigCheck INSTANCE = new IoTDBConfigCheck();
    private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class);
-   private Properties properties = new Properties();
    // this is a initial parameter.
    private static String TIMESTAMP_PRECISION = "ms";
+   private static long PARTITION_INTERVAL = 86400;
+   private Properties properties = new Properties();
  
    public static final IoTDBConfigCheck getInstance() {
      return IoTDBConfigCheck.INSTANCE;
@@@ -49,6 -50,24 +46,24 @@@
  
    public void checkConfig() {
      TIMESTAMP_PRECISION = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
+ 
+     // check time stamp precision
+     if (!(TIMESTAMP_PRECISION.equals("ms") || TIMESTAMP_PRECISION.equals("us")
 -        || TIMESTAMP_PRECISION.equals("ns"))) {
++            || TIMESTAMP_PRECISION.equals("ns"))) {
+       logger.error("Wrong timestamp precision, please set as: ms, us or ns ! Current is: "
 -          + TIMESTAMP_PRECISION);
++              + TIMESTAMP_PRECISION);
+       System.exit(-1);
+     }
+ 
+     PARTITION_INTERVAL = IoTDBDescriptor.getInstance().getConfig()
 -        .getPartitionInterval();
++            .getPartitionInterval();
+ 
+     // check partition interval
+     if (PARTITION_INTERVAL <= 0) {
+       logger.error("Partition interval must larger than 0!");
+       System.exit(-1);
+     }
+ 
      createDir(SCHEMA_DIR);
      checkFile(SCHEMA_DIR);
      logger.info("System configuration is ok.");
@@@ -65,7 -84,8 +80,8 @@@
    private void checkFile(String filepath) {
      // create file : read timestamp precision from engine.properties, create system_properties.txt
      // use output stream to write timestamp precision to file.
-     File file = SystemFileFactory.INSTANCE.getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
+     File file = SystemFileFactory.INSTANCE
 -        .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
++            .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
      try {
        if (!file.exists()) {
          file.createNewFile();
@@@ -79,12 -100,19 +96,19 @@@
        logger.error("Can not create {}.", file.getAbsolutePath(), e);
      }
      // get existed properties from system_properties.txt
-     File inputFile = SystemFileFactory.INSTANCE.getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
+     File inputFile = SystemFileFactory.INSTANCE
 -        .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
++            .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
      try (FileInputStream inputStream = new FileInputStream(inputFile.toString())) {
        properties.load(new InputStreamReader(inputStream, TSFileConfig.STRING_CHARSET));
        if (!properties.getProperty("timestamp_precision").equals(TIMESTAMP_PRECISION)) {
          logger.error("Wrong timestamp precision, please set as: " + properties
--            .getProperty("timestamp_precision") + " !");
++                .getProperty("timestamp_precision") + " !");
+         System.exit(-1);
+       }
+       if (!(Long.parseLong(properties.getProperty("storage_group_time_range"))
 -          == PARTITION_INTERVAL)) {
++              == PARTITION_INTERVAL)) {
+         logger.error("Wrong storage group time range, please set as: " + properties
 -            .getProperty("storage_group_time_range") + " !");
++                .getProperty("storage_group_time_range") + " !");
          System.exit(-1);
        }
      } catch (IOException e) {
diff --cc server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index c2e72e5,0000000..b024450
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@@ -1,352 -1,0 +1,354 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.query.dataset;
 +
 +import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 +import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 +import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 +import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
 +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.read.common.BatchData;
 +import org.apache.iotdb.tsfile.read.common.Path;
 +import org.apache.iotdb.tsfile.read.common.RowRecord;
 +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +import org.apache.iotdb.tsfile.utils.PublicBAOS;
 +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.atomic.AtomicIntegerArray;
 +
 +public class NonAlignEngineDataSet extends QueryDataSet {
 +
 +  private static class ReadTask implements Runnable {
 +
 +    private final ManagedSeriesReader reader;
 +    private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue;
 +    private WatermarkEncoder encoder;
 +    NonAlignEngineDataSet dataSet;
 +    private int index;
 +
 +
 +    public ReadTask(ManagedSeriesReader reader,
 +                    BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue,
 +                    WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) {
 +      this.reader = reader;
 +      this.blockingQueue = blockingQueue;
 +      this.encoder = encoder;
 +      this.dataSet = dataSet;
 +      this.index = index;
 +    }
 +
 +    @Override
 +    public void run() {
 +      PublicBAOS timeBAOS = new PublicBAOS();
 +      PublicBAOS valueBAOS = new PublicBAOS();
 +      try {
 +        synchronized (reader) {
 +          // if the task is submitted, there must be free space in the queue
 +          // so here we don't need to check whether the queue has free space
 +          // the reader has next batch
 +          if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
 +                  || reader.hasNextBatch()) {
 +            BatchData batchData;
 +            if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
 +              batchData = dataSet.cachedBatchData[index];
 +            else
 +              batchData = reader.nextBatch();
 +
 +            int rowCount = 0;
 +            while (rowCount < dataSet.fetchSize) {
 +
-               if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) {
++              if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray.get(index) >= dataSet.limit)) {
 +                break;
 +              }
 +
 +              if (batchData != null && batchData.hasCurrent()) {
-                 if (dataSet.offsetArray[index] == 0) {
++                if (dataSet.offsetArray.get(index) == 0) {
 +                  long time = batchData.currentTime();
 +                  ReadWriteIOUtils.write(time, timeBAOS);
 +                  TSDataType type = batchData.getDataType();
 +                  switch (type) {
 +                    case INT32:
 +                      int intValue = batchData.getInt();
 +                      if (encoder != null && encoder.needEncode(time)) {
 +                        intValue = encoder.encodeInt(intValue, time);
 +                      }
 +                      ReadWriteIOUtils.write(intValue, valueBAOS);
 +                      break;
 +                    case INT64:
 +                      long longValue = batchData.getLong();
 +                      if (encoder != null && encoder.needEncode(time)) {
 +                        longValue = encoder.encodeLong(longValue, time);
 +                      }
 +                      ReadWriteIOUtils.write(longValue, valueBAOS);
 +                      break;
 +                    case FLOAT:
 +                      float floatValue = batchData.getFloat();
 +                      if (encoder != null && encoder.needEncode(time)) {
 +                        floatValue = encoder.encodeFloat(floatValue, time);
 +                      }
 +                      ReadWriteIOUtils.write(floatValue, valueBAOS);
 +                      break;
 +                    case DOUBLE:
 +                      double doubleValue = batchData.getDouble();
 +                      if (encoder != null && encoder.needEncode(time)) {
 +                        doubleValue = encoder.encodeDouble(doubleValue, time);
 +                      }
 +                      ReadWriteIOUtils.write(doubleValue, valueBAOS);
 +                      break;
 +                    case BOOLEAN:
 +                      ReadWriteIOUtils.write(batchData.getBoolean(),
 +                              valueBAOS);
 +                      break;
 +                    case TEXT:
 +                      ReadWriteIOUtils
 +                              .write(batchData.getBinary(),
 +                                      valueBAOS);
 +                      break;
 +                    default:
 +                      throw new UnSupportedDataTypeException(
 +                              String.format("Data type %s is not supported.", type));
 +                  }
 +                }
 +                batchData.next();
 +              }
 +              else {
 +                if (reader.hasNextBatch()) {
 +                  batchData = reader.nextBatch();
 +                  dataSet.cachedBatchData[index] = batchData;
 +                  continue;
 +                }
 +                else
 +                  break;
 +              }
-               if (dataSet.offsetArray[index] == 0) {
++              if (dataSet.offsetArray.get(index) == 0) {
 +                rowCount++;
 +                if (dataSet.limit > 0) {
-                   dataSet.alreadyReturnedRowNumArray[index]++;
++                  dataSet.alreadyReturnedRowNumArray.incrementAndGet(index);
 +                }
 +              } else {
-                 dataSet.offsetArray[index]--;
++                dataSet.offsetArray.decrementAndGet(index);
 +              }
 +            }
 +            if (rowCount == 0) {
 +              blockingQueue.put(new Pair(null, null));
 +              // set the hasRemaining field in reader to false
 +              // tell the Consumer not to submit another task for this reader any more
 +              reader.setHasRemaining(false);
 +              // remove itself from the QueryTaskPoolManager
 +              reader.setManagedByQueryManager(false);
 +              return;
 +            }
 +
 +            ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
 +            timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
 +            timeBuffer.flip();
 +            ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size());
 +            valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size());
 +            valueBuffer.flip();
 +
 +            Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer);
 +
 +            blockingQueue.put(timeValueBAOSPair);
 +            // if the queue also has free space, just submit another itself
 +            if (blockingQueue.remainingCapacity() > 0) {
 +              pool.submit(this);
 +            }
 +            // the queue has no more space
 +            // remove itself from the QueryTaskPoolManager
 +            else {
 +              reader.setManagedByQueryManager(false);
 +            }
 +            return;
 +          }
 +          blockingQueue.put(new Pair(null, null));
 +          // set the hasRemaining field in reader to false
 +          // tell the Consumer not to submit another task for this reader any more
 +          reader.setHasRemaining(false);
 +          // remove itself from the QueryTaskPoolManager
 +          reader.setManagedByQueryManager(false);
 +        }
 +      } catch (InterruptedException e) {
 +        LOGGER.error("Interrupted while putting into the blocking queue: ", e);
 +      } catch (IOException e) {
 +        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
 +      } catch (Exception e) {
 +        LOGGER.error("Something gets wrong: ", e);
 +      }
 +
 +    }
 +
 +  }
 +
 +
 +  private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
 +
 +  // Blocking queue list for each time value buffer pair
 +  private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray;
 +
 +  private boolean initialized = false;
 +
-   private int[] offsetArray;
++  private AtomicIntegerArray offsetArray;
 +
 +  private int limit;
 +
-   private int[] alreadyReturnedRowNumArray;
++  private AtomicIntegerArray alreadyReturnedRowNumArray;
 +
 +  private BatchData[] cachedBatchData;
 +
 +  // indicate that there is no more batch data in the corresponding queue
 +  // in case that the consumer thread is blocked on the queue and won't get runnable any more
 +  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
 +  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
 +  // noMoreDataInQueue can still be true
 +  // its usage is to tell the consumer thread not to call the take() method.
 +  private boolean[] noMoreDataInQueueArray;
 +
 +  private int fetchSize;
 +
 +  // indicate that there is no more batch data in the corresponding queue
 +  // in case that the consumer thread is blocked on the queue and won't get runnable any more
 +  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
 +  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
 +  // noMoreDataInQueue can still be true
 +  // its usage is to tell the consumer thread not to call the take() method.
 +
 +  // capacity for blocking queue
 +  private static final int BLOCKING_QUEUE_CAPACITY = 5;
 +
 +  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
 +
 +  private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
 +
 +  /**
 +   * constructor of EngineDataSet.
 +   *
 +   * @param paths paths in List structure
 +   * @param dataTypes time series data type
 +   * @param readers readers in List(IPointReader) structure
 +   */
 +  public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
 +                               List<ManagedSeriesReader> readers) {
 +    super(paths, dataTypes);
 +    this.seriesReaderWithoutValueFilterList = readers;
 +    blockingQueueArray = new BlockingQueue[readers.size()];
 +    noMoreDataInQueueArray = new boolean[readers.size()];
 +    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
 +      blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
 +    }
 +  }
 +
 +  private void initLimit(int offset, int limit, int size) {
-     offsetArray = new int[size];
-     Arrays.fill(offsetArray, offset);
++    int[] offsetArrayTemp = new int[size];
++    Arrays.fill(offsetArrayTemp, offset);
++    offsetArray = new AtomicIntegerArray(offsetArrayTemp);
 +    this.limit = limit;
-     alreadyReturnedRowNumArray = new int[size];
++    this.alreadyReturnedRowNumArray = new AtomicIntegerArray(size);
 +    cachedBatchData = new BatchData[size];
 +  }
 +
 +  private void init(WatermarkEncoder encoder, int fetchSize) {
 +    initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
 +    this.fetchSize = fetchSize;
 +    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
 +      ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
 +      reader.setHasRemaining(true);
 +      reader.setManagedByQueryManager(true);
 +      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i));
 +    }
 +    this.initialized = true;
 +  }
 +
 +  /**
 +   * for RPC in RawData query between client and server
 +   * fill time buffers and value buffers
 +   */
 +  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException {
 +    if (!initialized) {
 +      init(encoder, fetchSize);
 +    }
 +    int seriesNum = seriesReaderWithoutValueFilterList.size();
 +    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
 +
 +    List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum);
 +    List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum);
 +
 +    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
 +      if (!noMoreDataInQueueArray[seriesIndex]) {
 +        Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take();
 +        if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
 +          noMoreDataInQueueArray[seriesIndex] = true;
 +          timeValueByteBufferPair.left = ByteBuffer.allocate(0);
 +          timeValueByteBufferPair.right = ByteBuffer.allocate(0);
 +        }
 +        timeBufferList.add(timeValueByteBufferPair.left);
 +        valueBufferList.add(timeValueByteBufferPair.right);
 +      }
 +      else {
 +        timeBufferList.add(ByteBuffer.allocate(0));
 +        valueBufferList.add(ByteBuffer.allocate(0));
 +        continue;
 +      }
 +
 +      synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
 +        if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
 +          ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
 +          // if the reader isn't being managed and still has more data,
 +          // that means this read task leave the pool before because the queue has no more space
 +          // now we should submit it again
 +          if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
 +            reader.setManagedByQueryManager(true);
 +            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
 +                    encoder, this, seriesIndex));
 +          }
 +        }
 +      }
 +    }
 +
 +    // set time buffers, value buffers and bitmap buffers
 +    tsQueryNonAlignDataSet.setTimeList(timeBufferList);
 +    tsQueryNonAlignDataSet.setValueList(valueBufferList);
 +
 +    return tsQueryNonAlignDataSet;
 +  }
 +
 +
 +  @Override
 +  protected boolean hasNextWithoutConstraint() {
 +    return false;
 +  }
 +
 +  @Override
 +  protected RowRecord nextWithoutConstraint() {
 +    return null;
 +  }
 +
 +
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
index bc70fa8,f6582ff..0a10375
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
@@@ -89,32 -88,6 +89,28 @@@ public class EngineExecutor 
        throw new StorageEngineException(e.getMessage());
      }
    }
 +  
 +  public QueryDataSet executeNonAlign(QueryContext context)
 +      throws StorageEngineException, IOException {
 +
 +    Filter timeFilter = null;
 +    if (optimizedExpression != null) {
 +      timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
 +    }
 +
 +    List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
 +    for (int i = 0; i < deduplicatedPaths.size(); i++) {
 +      Path path = deduplicatedPaths.get(i);
 +      TSDataType dataType = deduplicatedDataTypes.get(i);
 +
 +      ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
 +          true);
 +      readersOfSelectedSeries.add(reader);
 +    }
 +
-     try {
-       return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes,
-           readersOfSelectedSeries);
-     } catch (InterruptedException e) {
-       throw new StorageEngineException(e.getMessage());
-     }
++    return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes,
++        readersOfSelectedSeries);
 +  }
  
    /**
     * executeWithValueFilter query.
diff --cc server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
index 4b03cb1,3750746..87157bf
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
@@@ -18,8 -18,8 +18,6 @@@
   */
  package org.apache.iotdb.db.rescon;
  
--import java.util.ArrayDeque;
--import java.util.Deque;
  import org.apache.iotdb.db.conf.IoTDBConfig;
  import org.apache.iotdb.db.conf.IoTDBDescriptor;
  import org.apache.iotdb.db.engine.memtable.IMemTable;
@@@ -27,6 -27,6 +25,9 @@@ import org.apache.iotdb.db.engine.memta
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import java.util.ArrayDeque;
++import java.util.Deque;
++
  public class MemTablePool {
  
    private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
diff --cc session/pom.xml
index bc7b83b,8df9ae6..dab30aa
--- a/session/pom.xml
+++ b/session/pom.xml
@@@ -19,125 -19,134 +19,132 @@@
      under the License.
  
  -->
 -<project xmlns="http://maven.apache.org/POM/4.0.0"
 -  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 -  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 -  <parent>
 -    <artifactId>iotdb-parent</artifactId>
 -    <groupId>org.apache.iotdb</groupId>
 -    <version>0.10.0-SNAPSHOT</version>
 -  </parent>
 -  <modelVersion>4.0.0</modelVersion>
 -  <artifactId>iotdb-session</artifactId>
 -  <name>IoTDB Session</name>
 -  <properties>
 -    <session.test.skip>false</session.test.skip>
 -    <session.it.skip>${session.test.skip}</session.it.skip>
 -    <session.ut.skip>${session.test.skip}</session.ut.skip>
 -  </properties>
 -  <build>
 -    <plugins>
 -      <plugin>
 -        <artifactId>maven-assembly-plugin</artifactId>
 -        <version>3.1.0</version>
 -        <configuration>
 -          <descriptorRefs>
 -            <descriptorRef>jar-with-dependencies</descriptorRef>
 -          </descriptorRefs>
 -        </configuration>
 -        <executions>
 -          <execution>
 -            <id>make-assembly</id>
 -            <!-- this is used for inheritance merges -->
 -            <phase>package</phase>
 -            <!-- bind to the packaging phase -->
 -            <goals>
 -              <goal>single</goal>
 -            </goals>
 -          </execution>
 -        </executions>
 -      </plugin>
 -      <!--using `mvn test` to run UT, `mvn verify` to run ITs
 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 +    <parent>
 +        <artifactId>iotdb-parent</artifactId>
 +        <groupId>org.apache.iotdb</groupId>
 +        <version>0.10.0-SNAPSHOT</version>
 +    </parent>
 +    <modelVersion>4.0.0</modelVersion>
 +    <artifactId>iotdb-session</artifactId>
 +    <name>IoTDB Session</name>
 +    <properties>
 +        <session.test.skip>false</session.test.skip>
 +        <session.it.skip>${session.test.skip}</session.it.skip>
 +        <session.ut.skip>${session.test.skip}</session.ut.skip>
 +    </properties>
 +    <build>
 +        <plugins>
 +            <plugin>
 +                <artifactId>maven-assembly-plugin</artifactId>
 +                <version>3.1.0</version>
 +                <configuration>
 +                    <descriptorRefs>
 +                        <descriptorRef>jar-with-dependencies</descriptorRef>
 +                    </descriptorRefs>
 +                </configuration>
 +                <executions>
 +                    <execution>
 +                        <id>make-assembly</id>
 +                        <!-- this is used for inheritance merges -->
 +                        <phase>package</phase>
 +                        <!-- bind to the packaging phase -->
 +                        <goals>
 +                            <goal>single</goal>
 +                        </goals>
 +                    </execution>
 +                </executions>
 +            </plugin>
 +            <!--using `mvn test` to run UT, `mvn verify` to run ITs
-             Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
+       Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
 -      <plugin>
 -        <groupId>org.apache.maven.plugins</groupId>
 -        <artifactId>maven-surefire-plugin</artifactId>
 -        <configuration>
 -          <skipTests>${session.ut.skip}</skipTests>
 -        </configuration>
 -      </plugin>
 -      <plugin>
 -        <groupId>org.apache.maven.plugins</groupId>
 -        <artifactId>maven-failsafe-plugin</artifactId>
 -        <executions>
 -          <execution>
 -            <id>run-integration-tests</id>
 -            <phase>integration-test</phase>
 -            <goals>
 -              <goal>integration-test</goal>
 -              <goal>verify</goal>
 -            </goals>
 -          </execution>
 -        </executions>
 -        <configuration>
 -          <skipTests>${session.test.skip}</skipTests>
 -          <skipITs>${session.it.skip}</skipITs>
 -        </configuration>
 -      </plugin>
 -    </plugins>
 -  </build>
 -  <dependencies>
 -    <dependency>
 -      <groupId>org.apache.iotdb</groupId>
 -      <artifactId>service-rpc</artifactId>
 -      <version>${project.version}</version>
 -    </dependency>
 -    <dependency>
 -      <groupId>org.apache.iotdb</groupId>
 -      <artifactId>tsfile</artifactId>
 -      <version>${project.version}</version>
 -    </dependency>
 -    <dependency>
 -      <groupId>org.apache.iotdb</groupId>
 -      <artifactId>iotdb-server</artifactId>
 -      <version>${project.version}</version>
 -      <type>test-jar</type>
 -      <scope>test</scope>
 -    </dependency>
 -    <dependency>
 -      <groupId>org.apache.iotdb</groupId>
 -      <artifactId>iotdb-server</artifactId>
 -      <version>${project.version}</version>
 -      <scope>test</scope>
 -    </dependency>
 -    <dependency>
 -      <groupId>org.apache.iotdb</groupId>
 -      <artifactId>iotdb-jdbc</artifactId>
 -      <version>${project.version}</version>
 -      <scope>test</scope>
 -    </dependency>
 -    <dependency>
 -      <groupId>commons-lang</groupId>
 -      <artifactId>commons-lang</artifactId>
 -    </dependency>
 -  </dependencies>
 -  <profiles>
 -    <profile>
 -      <id>skipSessionTests</id>
 -      <activation>
 -        <property>
 -          <name>skipTests</name>
 -          <value>true</value>
 -        </property>
 -      </activation>
 -      <properties>
 -        <session.test.skip>true</session.test.skip>
 -        <session.ut.skip>true</session.ut.skip>
 -        <session.it.skip>true</session.it.skip>
 -      </properties>
 -    </profile>
 -    <profile>
 -      <id>skipUT_SessionTests</id>
 -      <activation>
 -        <property>
 -          <name>skipUTs</name>
 -          <value>true</value>
 -        </property>
 -      </activation>
 -      <properties>
 -        <session.ut.skip>true</session.ut.skip>
 -      </properties>
 -    </profile>
 -  </profiles>
 +            <plugin>
 +                <groupId>org.apache.maven.plugins</groupId>
 +                <artifactId>maven-surefire-plugin</artifactId>
 +                <configuration>
 +                    <skipTests>${session.ut.skip}</skipTests>
 +                </configuration>
 +            </plugin>
 +            <plugin>
 +                <groupId>org.apache.maven.plugins</groupId>
 +                <artifactId>maven-failsafe-plugin</artifactId>
 +                <executions>
 +                    <execution>
 +                        <id>run-integration-tests</id>
 +                        <phase>integration-test</phase>
 +                        <goals>
 +                            <goal>integration-test</goal>
 +                            <goal>verify</goal>
 +                        </goals>
 +                    </execution>
 +                </executions>
 +                <configuration>
 +                    <skipTests>${session.test.skip}</skipTests>
 +                    <skipITs>${session.it.skip}</skipITs>
 +                </configuration>
 +            </plugin>
 +        </plugins>
 +    </build>
 +    <dependencies>
 +        <dependency>
 +            <groupId>org.apache.iotdb</groupId>
 +            <artifactId>service-rpc</artifactId>
 +            <version>${project.version}</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.iotdb</groupId>
 +            <artifactId>tsfile</artifactId>
 +            <version>${project.version}</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.iotdb</groupId>
 +            <artifactId>iotdb-server</artifactId>
 +            <version>${project.version}</version>
++            <type>test-jar</type>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.iotdb</groupId>
++            <artifactId>iotdb-server</artifactId>
++            <version>${project.version}</version>
 +            <scope>test</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.iotdb</groupId>
 +            <artifactId>iotdb-jdbc</artifactId>
 +            <version>${project.version}</version>
 +            <scope>test</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>commons-lang</groupId>
 +            <artifactId>commons-lang</artifactId>
 +        </dependency>
 +    </dependencies>
 +    <profiles>
 +        <profile>
 +            <id>skipSessionTests</id>
 +            <activation>
 +                <property>
 +                    <name>skipTests</name>
 +                    <value>true</value>
 +                </property>
 +            </activation>
 +            <properties>
 +                <session.test.skip>true</session.test.skip>
 +                <session.ut.skip>true</session.ut.skip>
 +                <session.it.skip>true</session.it.skip>
 +            </properties>
 +        </profile>
 +        <profile>
 +            <id>skipUT_SessionTests</id>
 +            <activation>
 +                <property>
 +                    <name>skipUTs</name>
 +                    <value>true</value>
 +                </property>
 +            </activation>
 +            <properties>
 +                <session.ut.skip>true</session.ut.skip>
 +            </properties>
 +        </profile>
 +    </profiles>
  </project>


[incubator-iotdb] 03/04: complete

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 8f8d5701d0d8004ec41404cf91976912261591aa
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 15:57:39 2020 +0800

    complete
---
 .../iotdb/jdbc/IoTDBNonAlignQueryResultSet.java    | 27 ++++++++++------------
 1 file changed, 12 insertions(+), 15 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
index 6593887..28c89b3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
@@ -54,7 +54,6 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
   private List<String> columnTypeList; // no deduplication
   private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
   private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
-  private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
   private int fetchSize;
   private boolean emptyResultSet = false;
 
@@ -69,12 +68,12 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
   public IoTDBNonAlignQueryResultSet() {
     // do nothing
   }
-  
+
   // for disable align clause
   public IoTDBNonAlignQueryResultSet(Statement statement, List<String> columnNameList,
-      List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
-      String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset) 
-      throws SQLException {
+                                     List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
+                                     String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset)
+          throws SQLException {
     this.statement = statement;
     this.fetchSize = statement.getFetchSize();
     this.columnTypeList = columnTypeList;
@@ -156,7 +155,7 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
         throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
       } catch (TException e) {
         throw new SQLException(
-            "Error occurs when connecting to server for close operation, becasue: " + e);
+                "Error occurs when connecting to server for close operation, becasue: " + e);
       }
     }
     client = null;
@@ -682,7 +681,6 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
    * @return true means has results
    */
   private boolean fetchResults() throws SQLException {
-    rowsIndex = 0;
     TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, false);
     try {
       TSFetchResultsResp resp = client.fetchResults(req);
@@ -703,14 +701,14 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
       return resp.hasResultSet;
     } catch (TException e) {
       throw new SQLException(
-          "Cannot fetch result from server, because of network connection: {} ", e);
+              "Cannot fetch result from server, because of network connection: {} ", e);
     }
   }
 
   private boolean hasCachedResults() {
     return (tsQueryNonAlignDataSet != null && hasTimesRemaining());
   }
-  
+
   // check if has times remaining for disable align clause
   private boolean hasTimesRemaining() {
     for (ByteBuffer time : tsQueryNonAlignDataSet.timeList) {
@@ -720,7 +718,7 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
     }
     return false;
   }
-  
+
   private void constructNonAlignOneRow() {
     for (int i = 0; i < tsQueryNonAlignDataSet.timeList.size(); i++) {
       times[i] = null;
@@ -769,7 +767,7 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
             break;
           default:
             throw new UnSupportedDataTypeException(
-                String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+                    String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
         }
       }
       else {
@@ -777,7 +775,6 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
         values[i] = EMPTY_STR.getBytes();
       }
     }
-    rowsIndex++;
   }
 
   @Override
@@ -1230,7 +1227,7 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
   public boolean wasNull() throws SQLException {
     throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
   }
-  
+
   private void checkRecord() throws SQLException {
     if (Objects.isNull(tsQueryNonAlignDataSet)) {
       throw new SQLException("No record remains");
@@ -1243,7 +1240,7 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
     }
     if (columnIndex > columnInfoList.size()) {
       throw new SQLException(
-          String.format("column index %d out of range %d", columnIndex, columnInfoList.size()));
+              String.format("column index %d out of range %d", columnIndex, columnInfoList.size()));
     }
     return columnInfoList.get(columnIndex - 1);
   }
@@ -1288,5 +1285,5 @@ public class IoTDBNonAlignQueryResultSet implements ResultSet {
   public void setIgnoreTimeStamp(boolean ignoreTimeStamp) {
     this.ignoreTimeStamp = ignoreTimeStamp;
   }
-  
+
 }


[incubator-iotdb] 02/04: s

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a52c63d5ab7be97bf4bdd009aa66c783a7f2b5ed
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 15:56:58 2020 +0800

    s
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 210 +++++++++------------
 1 file changed, 86 insertions(+), 124 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3f4c4ca..78c0082 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -18,27 +18,6 @@
  */
 package org.apache.iotdb.db.service;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -67,12 +46,7 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter;
@@ -81,34 +55,7 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
-import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
-import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
-import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatusType;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -121,6 +68,17 @@ import org.apache.thrift.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
 /**
  * Thrift RPC implementation at server side.
  */
@@ -196,7 +154,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           return TSDataType.DOUBLE;
         default:
           throw new QueryProcessException(
-              "aggregate does not support " + aggrType + " function.");
+                  "aggregate does not support " + aggrType + " function.");
       }
     }
     return MManager.getInstance().getSeriesType(path);
@@ -205,7 +163,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     logger.info("{}: receive open session request from username {}", IoTDBConstant.GLOBAL_DB_NAME,
-        req.getUsername());
+            req.getUsername());
 
     boolean status;
     IAuthorizer authorizer;
@@ -232,10 +190,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       tsStatus = getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR);
     }
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
-        TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
+            TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
     resp.setSessionId(sessionId);
     logger.info("{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME,
-        tsStatus.getStatusType().getMessage(), req.getUsername());
+            tsStatus.getStatusType().getMessage(), req.getUsername());
 
     return resp;
   }
@@ -271,9 +229,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
     if (!exceptions.isEmpty()) {
       return new TSStatus(
-          getStatus(TSStatusCode.CLOSE_OPERATION_ERROR,
-              String.format("%d errors in closeOperation, see server logs for detail",
-                  exceptions.size())));
+              getStatus(TSStatusCode.CLOSE_OPERATION_ERROR,
+                      String.format("%d errors in closeOperation, see server logs for detail",
+                              exceptions.size())));
     }
 
     return new TSStatus(tsStatus);
@@ -375,7 +333,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
     } catch (QueryProcessException | MetadataException | OutOfMemoryError e) {
       logger
-          .error(String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e);
+              .error(String.format("Failed to fetch timeseries %s's metadata", req.getColumnPath()), e);
       status = getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
       resp.setStatus(status);
       return resp;
@@ -417,7 +375,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     switch (statement) {
       case "merge":
         StorageEngine.getInstance()
-            .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+                .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
         return true;
       case "full merge":
         StorageEngine.getInstance().mergeAll(true);
@@ -463,16 +421,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       for (String statement : statements) {
         long t2 = System.currentTimeMillis();
         isAllSuccessful =
-            executeStatementInBatch(statement, batchErrorMessage, result,
-                req.getSessionId()) && isAllSuccessful;
+                executeStatementInBatch(statement, batchErrorMessage, result,
+                        req.getSessionId()) && isAllSuccessful;
         Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
       }
       if (isAllSuccessful) {
         return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS,
-            "Execute batch statements successfully"), result);
+                "Execute batch statements successfully"), result);
       } else {
         return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-            batchErrorMessage.toString()), result);
+                batchErrorMessage.toString()), result);
       }
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
@@ -482,16 +440,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   // execute one statement of a batch. Currently, query is not allowed in a batch statement and
   // on finding queries in a batch, such query will be ignored and an error will be generated
   private boolean executeStatementInBatch(String statement, StringBuilder batchErrorMessage,
-      List<Integer> result, long sessionId) {
+                                          List<Integer> result, long sessionId) {
     try {
       PhysicalPlan physicalPlan = processor
-          .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId));
+              .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId));
       if (physicalPlan.isQuery()) {
         throw new QueryInBatchStatementException(statement);
       }
       TSExecuteStatementResp resp = executeUpdateStatement(physicalPlan, sessionId);
       if (resp.getStatus().getStatusType().getCode() == TSStatusCode.SUCCESS_STATUS
-          .getStatusCode()) {
+              .getStatusCode()) {
         result.add(Statement.SUCCESS_NO_INFO);
       } else {
         result.add(Statement.EXECUTE_FAILED);
@@ -510,8 +468,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return false;
     } catch (QueryProcessException e) {
       logger.info(
-          "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}",
-          statement, e.getMessage());
+              "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}",
+              statement, e.getMessage());
       result.add(Statement.EXECUTE_FAILED);
       batchErrorMessage.append(TSStatusCode.SQL_PARSE_ERROR.getStatusCode()).append("\n");
       return false;
@@ -539,13 +497,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       if (execAdminCommand(statement, req.getSessionId())) {
         return getTSExecuteStatementResp(
-            getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS"));
+                getStatus(TSStatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS"));
       }
       PhysicalPlan physicalPlan = processor.parseSQLToPhysicalPlan(statement,
-          sessionIdZoneIdMap.get(req.getSessionId()));
+              sessionIdZoneIdMap.get(req.getSessionId()));
       if (physicalPlan.isQuery()) {
         resp = executeQueryStatement(req.statementId, physicalPlan, req.fetchSize,
-            sessionIdUsernameMap.get(req.getSessionId()));
+                sessionIdUsernameMap.get(req.getSessionId()));
         long endTime = System.currentTimeMillis();
         sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
         sqlArgumentsList.add(sqlArgument);
@@ -562,15 +520,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     } catch (SQLParserException e) {
       logger.error("check metadata error: ", e);
       return getTSExecuteStatementResp(getStatus(TSStatusCode.METADATA_ERROR,
-          "Check metadata error: " + e.getMessage()));
+              "Check metadata error: " + e.getMessage()));
     } catch (QueryProcessException e) {
       logger.info(ERROR_PARSING_SQL, e.getMessage());
       return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR,
-          "Statement format is not right: " + e.getMessage()));
+              "Statement format is not right: " + e.getMessage()));
     } catch (StorageEngineException e) {
       logger.info(ERROR_PARSING_SQL, e.getMessage());
       return getTSExecuteStatementResp(getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR,
-          e.getMessage()));
+              e.getMessage()));
     }
   }
 
@@ -579,7 +537,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    * AuthorPlan
    */
   private TSExecuteStatementResp executeQueryStatement(long statementId, PhysicalPlan plan,
-      int fetchSize, String username) {
+                                                       int fetchSize, String username) {
     long t1 = System.currentTimeMillis();
     try {
       TSExecuteStatementResp resp; // column headers
@@ -627,7 +585,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     } catch (Exception e) {
       logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSExecuteStatementResp(
-          getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+              getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
     }
@@ -644,7 +602,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     PhysicalPlan physicalPlan;
     try {
       physicalPlan = processor
-          .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
+              .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
     } catch (QueryProcessException | SQLParserException e) {
       logger.info(ERROR_PARSING_SQL, e.getMessage());
       return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()));
@@ -652,14 +610,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     if (!physicalPlan.isQuery()) {
       return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "Statement is not a query statement."));
+              "Statement is not a query statement."));
     }
     return executeQueryStatement(req.statementId, physicalPlan, req.fetchSize,
-        sessionIdUsernameMap.get(req.getSessionId()));
+            sessionIdUsernameMap.get(req.getSessionId()));
   }
 
   private TSExecuteStatementResp getShowQueryColumnHeaders(ShowPlan showPlan)
-      throws QueryProcessException {
+          throws QueryProcessException {
     switch (showPlan.getShowContentType()) {
       case TTL:
         return StaticResps.TTL_RESP;
@@ -686,7 +644,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       default:
         logger.error("Unsupported show content type: {}", showPlan.getShowContentType());
         throw new QueryProcessException(
-            "Unsupported show content type:" + showPlan.getShowContentType());
+                "Unsupported show content type:" + showPlan.getShowContentType());
     }
   }
 
@@ -705,7 +663,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return StaticResps.LIST_USER_PRIVILEGE_RESP;
       default:
         return getTSExecuteStatementResp(getStatus(TSStatusCode.SQL_PARSE_ERROR,
-            String.format("%s is not an auth query", authorPlan.getAuthorType())));
+                String.format("%s is not an auth query", authorPlan.getAuthorType())));
     }
   }
 
@@ -714,7 +672,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    * get ResultSet schema
    */
   private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan, String username)
-      throws AuthException, TException, QueryProcessException {
+          throws AuthException, TException, QueryProcessException {
 
     List<String> respColumns = new ArrayList<>();
     List<String> columnsTypes = new ArrayList<>();
@@ -722,7 +680,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     // check permissions
     if (!checkAuthorization(physicalPlan.getPaths(), physicalPlan, username)) {
       return getTSExecuteStatementResp(getStatus(TSStatusCode.NO_PERMISSION_ERROR,
-          "No permissions for this operation " + physicalPlan.getOperatorType()));
+              "No permissions for this operation " + physicalPlan.getOperatorType()));
     }
 
     TSExecuteStatementResp resp = getTSExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS));
@@ -734,7 +692,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       // set dataTypeList in TSExecuteStatementResp. Note this is without deduplication.
       resp.setColumns(respColumns);
       resp.setDataTypeList(columnsTypes);
-    } 
+    }
     else {
       getWideQueryHeaders(plan, respColumns, columnsTypes);
       resp.setColumns(respColumns);
@@ -745,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   // wide means not group by device
   private void getWideQueryHeaders(QueryPlan plan, List<String> respColumns,
-      List<String> columnTypes) throws TException, QueryProcessException {
+                                   List<String> columnTypes) throws TException, QueryProcessException {
     // Restore column header of aggregate to func(column_name), only
     // support single aggregate function for now
     List<Path> paths = plan.getPaths();
@@ -778,7 +736,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private void getGroupByDeviceQueryHeaders(QueryPlan plan, List<String> respColumns,
-      List<String> columnTypes) {
+                                            List<String> columnTypes) {
     // set columns in TSExecuteStatementResp. Note this is without deduplication.
     List<String> measurementColumns = plan.getMeasurements();
     respColumns.add(SQLConstant.GROUPBY_DEVICE_COLUMN_NAME);
@@ -823,19 +781,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       if (!queryId2DataSet.containsKey(req.queryId)) {
         return getTSFetchResultsResp(
-            getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
+                getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
       }
 
       QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
       if (req.isAlign) {
         TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
-            sessionIdUsernameMap.get(req.sessionId));
+                sessionIdUsernameMap.get(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
           queryId2DataSet.remove(req.queryId);
         }
         TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
-            "FetchResult successfully. Has more result: " + hasResultSet));
+                "FetchResult successfully. Has more result: " + hasResultSet));
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
@@ -843,13 +801,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       }
       else {
         TSQueryNonAlignDataSet nonAlignResult = fillRpcNonAlignReturnData(req.fetchSize, queryDataSet,
-            sessionIdUsernameMap.get(req.sessionId));
-        boolean hasResultSet = nonAlignResult.getTimeList().get(0).limit() != 0;
+                sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = false;
+        for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
+          if (timeBuffer.limit() != 0) {
+            hasResultSet = true;
+            break;
+          }
+        }
         if (!hasResultSet) {
           queryId2DataSet.remove(req.queryId);
         }
         TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
-            "FetchResult successfully. Has more result: " + hasResultSet));
+                "FetchResult successfully. Has more result: " + hasResultSet));
         resp.setHasResultSet(hasResultSet);
         resp.setNonAlignQueryDataSet(nonAlignResult);
         resp.setIsAlign(false);
@@ -877,7 +841,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         encoder = new GroupedLSBWatermarkEncoder(config);
       } else {
         throw new UnSupportedDataTypeException(String.format(
-            "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+                "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
       }
       if (queryDataSet instanceof NewEngineDataSetWithoutValueFilter) {
         // optimize for query without value filter
@@ -895,9 +859,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
     return result;
   }
-  
-  private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, QueryDataSet queryDataSet, 
-      String userName) throws TException, AuthException, IOException, InterruptedException {
+
+  private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(int fetchSize, QueryDataSet queryDataSet,
+                                                           String userName) throws TException, AuthException, IOException, InterruptedException {
     IAuthorizer authorizer;
     try {
       authorizer = LocalFileAuthorizer.getInstance();
@@ -905,30 +869,30 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       throw new TException(e);
     }
     TSQueryNonAlignDataSet result;
-    
+
     if (config.isEnableWatermark() && authorizer.isUserUseWaterMark(userName)) {
       WatermarkEncoder encoder;
       if (config.getWatermarkMethodName().equals(IoTDBConfig.WATERMARK_GROUPED_LSB)) {
         encoder = new GroupedLSBWatermarkEncoder(config);
       } else {
         throw new UnSupportedDataTypeException(String.format(
-            "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
+                "Watermark method is not supported yet: %s", config.getWatermarkMethodName()));
       }
       result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
     } else {
       result = ((NonAlignEngineDataSet) queryDataSet).fillBuffer(fetchSize, null);
     }
     return result;
-    
+
   }
-  
-  
+
+
 
   /**
    * create QueryDataSet and buffer it for fetchResults
    */
   private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws
-      QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException {
+          QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException {
 
     QueryContext context = new QueryContext(queryId);
     QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan, context);
@@ -948,7 +912,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     } catch (Exception e) {
       logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSExecuteStatementResp(
-          getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+              getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
     }
   }
 
@@ -968,7 +932,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private boolean executeNonQuery(PhysicalPlan plan) throws QueryProcessException {
     if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
       throw new QueryProcessException(
-          "Current system mode is read-only, does not support non-query operation");
+              "Current system mode is read-only, does not support non-query operation");
     }
     return processor.getExecutor().processNonQuery(plan);
   }
@@ -985,7 +949,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     if (physicalPlan.isQuery()) {
       return getTSExecuteStatementResp(getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "Statement is a query statement."));
+              "Statement is a query statement."));
     }
 
     return executeUpdateStatement(physicalPlan, sessionId);
@@ -1001,7 +965,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private boolean checkAuthorization(List<Path> paths, PhysicalPlan plan, String username)
-      throws AuthException {
+          throws AuthException {
     String targetUser = null;
     if (plan instanceof AuthorPlan) {
       targetUser = ((AuthorPlan) plan).getUserName();
@@ -1017,7 +981,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private TSExecuteBatchStatementResp getTSBatchExecuteStatementResp(TSStatus status,
-      List<Integer> result) {
+                                                                     List<Integer> result) {
     TSExecuteBatchStatementResp resp = new TSExecuteBatchStatementResp();
     TSStatus tsStatus = new TSStatus(status);
     resp.setStatus(tsStatus);
@@ -1077,7 +1041,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME);
     properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME);
     properties
-        .setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
+            .setTimestampPrecision(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision());
     return properties;
   }
 
@@ -1186,10 +1150,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       BatchInsertPlan batchInsertPlan = new BatchInsertPlan(req.deviceId, req.measurements);
       batchInsertPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size));
       batchInsertPlan.setColumns(QueryDataSetUtils
-          .readValuesFromBuffer(req.values, req.types, req.measurements.size(), req.size));
+              .readValuesFromBuffer(req.values, req.types, req.measurements.size(), req.size));
       batchInsertPlan.setRowCount(req.size);
-      batchInsertPlan.setTimeBuffer(req.timestamps);
-      batchInsertPlan.setValueBuffer(req.values);
       batchInsertPlan.setDataTypes(req.types);
 
       boolean isAllSuccessful = true;
@@ -1209,16 +1171,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       if (isAllSuccessful) {
         logger.debug("Insert one RowBatch successfully");
         return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.SUCCESS_STATUS),
-            Arrays.asList(results));
+                Arrays.asList(results));
       } else {
         logger.debug("Insert one RowBatch failed!");
         return getTSBatchExecuteStatementResp(getStatus(TSStatusCode.INTERNAL_SERVER_ERROR),
-            Arrays.asList(results));
+                Arrays.asList(results));
       }
     } catch (Exception e) {
       logger.info("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e);
       return getTSBatchExecuteStatementResp(
-          getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), null);
+              getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()), null);
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
     }
@@ -1264,8 +1226,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return getStatus(TSStatusCode.NOT_LOGIN_ERROR);
     }
     CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new Path(req.getPath()),
-        TSDataType.values()[req.getDataType()], TSEncoding.values()[req.getEncoding()],
-        CompressionType.values()[req.compressor], new HashMap<>());
+            TSDataType.values()[req.getDataType()], TSEncoding.values()[req.getEncoding()],
+            CompressionType.values()[req.compressor], new HashMap<>());
     TSStatus status = checkAuthority(plan, req.getSessionId());
     if (status != null) {
       return new TSStatus(status);
@@ -1303,7 +1265,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     try {
       if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
         return getStatus(TSStatusCode.NO_PERMISSION_ERROR,
-            "No permissions for this operation " + plan.getOperatorType().toString());
+                "No permissions for this operation " + plan.getOperatorType().toString());
       }
     } catch (AuthException e) {
       logger.error("meet error while checking authorization.", e);
@@ -1322,7 +1284,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
 
     return execRet ? getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
-        : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+            : getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
   private long generateQueryId(boolean isDataQuery) {