You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/01/03 08:52:36 UTC

[GitHub] [incubator-iotdb] HTHou opened a new pull request #705: [IOTDB-396] New query clause: disable align

HTHou opened a new pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705
 
 
   Disable Align Clause: DISABLE ALIGN
   
   Rules:  
   1. Both uppercase and lowercase are ok.  
   Correct example: select * from root.sg1 disable align  
   Correct example: select * from root.sg1 DISABLE ALIGN  
   
   2. Disable Align Clause can only be used at the end of a query statement.  
   Correct example: select * from root.sg1 where time > 10 disable align 
   Wrong example: select * from root.sg1 disable align where time > 10 
   
   3. Disable Align Clause cannot be used with Aggregation, Fill Statements, Group By or Group By Device Statements, but can with Limit Statements.
   Correct example: select * from root.sg1 limit 3 offset 2 disable align
   Correct example: select * from root.sg1 slimit 3 soffset 2 disable align
   Wrong example: select count(s0),count(s1) from root.sg1.d1 disable align
   Wrong example: select * from root.vehicle where root.vehicle.d0.s0>0 disable align
   Wrong example: select * from root.vehicle group by device disable align
   
   You could expect a table like:
   ![Screen Shot 2020-01-03 at 4 16 25 PM](https://user-images.githubusercontent.com/25913899/71714565-5b736180-2e49-11ea-97ea-e6cc94f4e956.png)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363659057
 
 

 ##########
 File path: service-rpc/src/main/thrift/rpc.thrift
 ##########
 @@ -146,6 +148,7 @@ struct TSFetchResultsResp{
 	1: required TSStatus status
 	2: required bool hasResultSet
 	3: optional TSQueryDataSet queryDataSet
+	4: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
 
 Review comment:
   Add a comment like "one and only one of queryDataSet or nonAlignQueryDataSet is not null" or use an additional field to indicate which one is returned.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365075533
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -1226,7 +1380,7 @@ public boolean wasNull() throws SQLException {
   }
 
   private void checkRecord() throws SQLException {
-    if (Objects.isNull(tsQueryDataSet)) {
+    if (Objects.isNull(tsQueryDataSet) && Objects.isNull(tsQueryNonAlignDataSet)) {
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365630478
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
 ##########
 @@ -826,19 +827,34 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
       }
 
       QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
-      TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
-          sessionIdUsernameMap.get(req.sessionId));
-
-      boolean hasResultSet = result.bufferForTime().limit() != 0;
-      if (!hasResultSet) {
-        queryId2DataSet.remove(req.queryId);
+      if (req.isAlign) {
+        TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
+            sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = result.bufferForTime().limit() != 0;
+        if (!hasResultSet) {
+          queryId2DataSet.remove(req.queryId);
+        }
+        TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
+            "FetchResult successfully. Has more result: " + hasResultSet));
+        resp.setHasResultSet(hasResultSet);
+        resp.setQueryDataSet(result);
+        resp.setIsAlign(true);
+        return resp;
+      }
+      else {
+        TSQueryNonAlignDataSet nonAlignResult = fillRpcNonAlignReturnData(req.fetchSize, queryDataSet,
+            sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = nonAlignResult.getTimeList().get(0).limit() != 0;
 
 Review comment:
   I just want to confirm: Even if there is no result, will there still be an empty list in the TimeList? Otherwise, an IndexOutOfRangeException will be thrown.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365074683
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365629447
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
+            int rowCount = 0;
+            while (rowCount < fetchSize) {
+              
+              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+                break;
+              }
+              
+              if (batchData != null && batchData.hasCurrent()) {
+                if (rowOffset == 0) {
+                  long time = batchData.currentTime();
+                  ReadWriteIOUtils.write(time, timeBAOS);
+                  TSDataType type = batchData.getDataType();
+                  switch (type) {
+                    case INT32:
+                      int intValue = batchData.getInt();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        intValue = encoder.encodeInt(intValue, time);
+                      }
+                      ReadWriteIOUtils.write(intValue, valueBAOS);
+                      break;
+                    case INT64:
+                      long longValue = batchData.getLong();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        longValue = encoder.encodeLong(longValue, time);
+                      }
+                      ReadWriteIOUtils.write(longValue, valueBAOS);
+                      break;
+                    case FLOAT:
+                      float floatValue = batchData.getFloat();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        floatValue = encoder.encodeFloat(floatValue, time);
+                      }
+                      ReadWriteIOUtils.write(floatValue, valueBAOS);
+                      break;
+                    case DOUBLE:
+                      double doubleValue = batchData.getDouble();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        doubleValue = encoder.encodeDouble(doubleValue, time);
+                      }
+                      ReadWriteIOUtils.write(doubleValue, valueBAOS);
+                      break;
+                    case BOOLEAN:
+                      ReadWriteIOUtils.write(batchData.getBoolean(),
+                              valueBAOS);
+                      break;
+                    case TEXT:
+                      ReadWriteIOUtils
+                              .write(batchData.getBinary(),
+                                      valueBAOS);
+                      break;
+                    default:
+                      throw new UnSupportedDataTypeException(
+                              String.format("Data type %s is not supported.", type));
+                  }
+                }
+                batchData.next();
+              }
+              else {
+                break;
+              }
+              if (rowOffset == 0) {
+                rowCount++;
+                if (rowLimit > 0) {
+                  alreadyReturnedRowNum++;
+                }
+              } else {
+                rowOffset--;
+              }
+            }
+            
+            Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
+            
+            blockingQueue.put(timeValueBAOSPair);
+            // if the queue also has free space, just submit another itself
+            if (blockingQueue.remainingCapacity() > 0) {
+              pool.submit(this);
+            }
+            // the queue has no more space
+            // remove itself from the QueryTaskPoolManager
+            else {
+              reader.setManagedByQueryManager(false);
+            }
+            return;
+          }
+          blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+          // set the hasRemaining field in reader to false
+          // tell the Consumer not to submit another task for this reader any more
+          reader.setHasRemaining(false);
+          // remove itself from the QueryTaskPoolManager
+          reader.setManagedByQueryManager(false);
+        }
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+      } catch (IOException e) {
+        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+      } catch (Exception e) {
+        LOGGER.error("Something gets wrong: ", e);
+      }
+      
+    }
+    
+  }
+
+  
+  private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
+  
+  // Blocking queue list for each time value buffer pair
+  private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
+  
+  private boolean initialized = false;
+  
+  // indicate that there is no more batch data in the corresponding queue
+  // in case that the consumer thread is blocked on the queue and won't get runnable any more
+  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+  // noMoreDataInQueue can still be true
+  // its usage is to tell the consumer thread not to call the take() method.
+
 
 Review comment:
   What is this comment talking about?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 closed pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 closed pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365628834
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
 ##########
 @@ -0,0 +1,1292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.*;
+import java.util.*;
+
+public class IoTDBNonAlignQueryResultSet implements ResultSet {
+
+  private static final String TIMESTAMP_STR = "Time";
+  private static final int TIMESTAMP_STR_LENGTH = 4;
+  private static final int START_INDEX = 2;
+  private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
+  private static final String EMPTY_STR = "";
+  private Statement statement = null;
+  private String sql;
+  private SQLWarning warningChain = null;
+  private boolean isClosed = false;
+  private TSIService.Iface client = null;
+  private List<String> columnInfoList; // no deduplication
+  private List<String> columnTypeList; // no deduplication
+  private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
+  private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
+  private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
+  private int fetchSize;
+  private boolean emptyResultSet = false;
+
+  private TSQueryNonAlignDataSet tsQueryNonAlignDataSet = null;
+  private byte[][] times; // used for disable align
+  private byte[][] values; // used to cache the current row record value
+
+  private long sessionId;
+  private long queryId;
+  private boolean ignoreTimeStamp = false;
+
+  public IoTDBNonAlignQueryResultSet() {
+    // do nothing
+  }
+  
+  // for disable align clause
+  public IoTDBNonAlignQueryResultSet(Statement statement, List<String> columnNameList,
+      List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
+      String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset) 
+      throws SQLException {
+    this.statement = statement;
+    this.fetchSize = statement.getFetchSize();
+    this.columnTypeList = columnTypeList;
+
+    times = new byte[columnNameList.size()][Long.BYTES];
+    values = new byte[columnNameList.size()][];
+
+    this.columnInfoList = new ArrayList<>();
+    // deduplicate and map
+    this.columnInfoMap = new HashMap<>();
+    this.columnInfoMap.put(TIMESTAMP_STR, 1);
+    this.columnTypeDeduplicatedList = new ArrayList<>();
+    int index = START_INDEX;
+    for (int i = 0; i < columnNameList.size(); i++) {
+      String name = columnNameList.get(i);
+      columnInfoList.add(TIMESTAMP_STR + name);
+      columnInfoList.add(name);
+      if (!columnInfoMap.containsKey(name)) {
+        columnInfoMap.put(name, index++);
+        columnTypeDeduplicatedList.add(columnTypeList.get(i));
+      }
+    }
+
+    this.ignoreTimeStamp = ignoreTimeStamp;
+    this.client = client;
+    this.sql = sql;
+    this.queryId = queryId;
+    this.tsQueryNonAlignDataSet = dataset;
+    this.sessionId = sessionId;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean absolute(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void afterLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void beforeFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void cancelRowUpdates() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (isClosed) {
+      return;
+    }
+    if (client != null) {
+      try {
+        TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
+        closeReq.setQueryId(queryId);
+        TSStatus closeResp = client.closeOperation(closeReq);
+        RpcUtils.verifySuccess(closeResp);
+      } catch (IoTDBRPCException e) {
+        throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
+      } catch (TException e) {
+        throw new SQLException(
+            "Error occurs when connecting to server for close operation, becasue: " + e);
+      }
+    }
+    client = null;
+    isClosed = true;
+  }
+
+
+  @Override
+  public void deleteRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int findColumn(String columnName) {
+    return columnInfoMap.get(columnName);
+  }
+
+  @Override
+  public boolean first() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Array getArray(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Array getArray(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getAsciiStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getAsciiStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+    return getBigDecimal(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnName) throws SQLException {
+    return new BigDecimal(Objects.requireNonNull(getValueByName(columnName)));
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+    MathContext mc = new MathContext(scale);
+    return getBigDecimal(columnIndex).round(mc);
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnName, int scale) throws SQLException {
+    return getBigDecimal(findColumn(columnName), scale);
+  }
+
+  @Override
+  public InputStream getBinaryStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getBinaryStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Blob getBlob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Blob getBlob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws SQLException {
+    return getBoolean(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public boolean getBoolean(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToBool(values[index]);
+    }
+    else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public byte getByte(int columnIndex) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte getByte(String columnName) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte[] getBytes(int columnIndex) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte[] getBytes(String columnName) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getCharacterStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getCharacterStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Clob getClob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Clob getClob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getConcurrency() {
+    return ResultSet.CONCUR_READ_ONLY;
+  }
+
+  @Override
+  public String getCursorName() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Date getDate(int columnIndex) throws SQLException {
+    return new Date(getLong(columnIndex));
+  }
+
+  @Override
+  public Date getDate(String columnName) throws SQLException {
+    return getDate(findColumn(columnName));
+  }
+
+  @Override
+  public Date getDate(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Date getDate(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws SQLException {
+    return getDouble(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public double getDouble(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToDouble(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    return ResultSet.FETCH_FORWARD;
+  }
+
+  @Override
+  public void setFetchDirection(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void setFetchSize(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws SQLException {
+    return getFloat(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public float getFloat(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToFloat(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws SQLException {
+    return getInt(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public int getInt(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToInt(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws SQLException {
+    return getLong(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public long getLong(String columnName) throws SQLException {
+    checkRecord();
+    if (columnName.startsWith(TIMESTAMP_STR)) {
+      String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+      int index = columnInfoMap.get(column) - START_INDEX;
+      return BytesUtils.bytesToLong(times[index]);
+    }
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToLong(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() {
+    return new IoTDBResultMetadata(columnInfoList, columnTypeList, false);
+  }
+
+  @Override
+  public Reader getNCharacterStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getNCharacterStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public NClob getNClob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public NClob getNClob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public String getNString(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public String getNString(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Object getObject(int columnIndex) throws SQLException {
+    return getObject(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public Object getObject(String columnName) throws SQLException {
+    return getValueByName(columnName);
+  }
+
+  @Override
+  public Object getObject(int arg0, Map<String, Class<?>> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Object getObject(String arg0, Map<String, Class<?>> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T getObject(int arg0, Class<T> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T getObject(String arg0, Class<T> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Ref getRef(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Ref getRef(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public RowId getRowId(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public RowId getRowId(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLXML getSQLXML(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLXML getSQLXML(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public short getShort(int columnIndex) throws SQLException {
+    return getShort(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public short getShort(String columnName) throws SQLException {
+    return Short.parseShort(Objects.requireNonNull(getValueByName(columnName)));
+  }
+
+  @Override
+  public Statement getStatement() {
+    return this.statement;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws SQLException {
+    return getString(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public String getString(String columnName) throws SQLException {
+    return getValueByName(columnName);
+  }
+
+  @Override
+  public Time getTime(int columnIndex) throws SQLException {
+    return new Time(getLong(columnIndex));
+  }
+
+  @Override
+  public Time getTime(String columnName) throws SQLException {
+    return getTime(findColumn(columnName));
+  }
+
+  @Override
+  public Time getTime(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Time getTime(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Timestamp getTimestamp(int columnIndex) throws SQLException {
+    return new Timestamp(getLong(columnIndex));
+  }
+
+  @Override
+  public Timestamp getTimestamp(String columnName) throws SQLException {
+    return getTimestamp(findColumn(columnName));
+  }
+
+  @Override
+  public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Timestamp getTimestamp(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getType() {
+    return ResultSet.TYPE_FORWARD_ONLY;
+  }
+
+  @Override
+  public URL getURL(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public URL getURL(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getUnicodeStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getUnicodeStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLWarning getWarnings() {
+    return warningChain;
+  }
+
+  @Override
+  public void insertRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isAfterLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isBeforeFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  @Override
+  public boolean isFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean last() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void moveToCurrentRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void moveToInsertRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
 
 Review comment:
   These methods are identical to those in the aligned resultset. To avoid duplication, I would suggest you put them in an abstract class and make the two resultsets extend it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364031640
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -703,7 +782,18 @@ private boolean fetchResults() throws SQLException {
   }
 
   private boolean hasCachedResults() {
-    return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining();
+    return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining()
+        || tsQueryNonAlignDataSet != null && hasTimesRemaining();
 
 Review comment:
   It's better to add () between two &&, although the && has higher priority than || in java

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364037121
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      timeBAOSList[seriesIndex] = new PublicBAOS();
+      valueBAOSList[seriesIndex] = new PublicBAOS();
+      bitmapBAOSList[seriesIndex] = new PublicBAOS();
+    }
+
+    // used to record a bitmap for every 8 row record
+    int[] currentBitmapList = new int[seriesNum];
+    int rowCount = 0;
+    while (rowCount < fetchSize) {
+
+      if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit) || timeHeap.isEmpty()) {
+        break;
+      }
+
+      for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+
+        if (cachedBatchDataArray[seriesIndex] == null
+                || !cachedBatchDataArray[seriesIndex].hasCurrent()) {
+          // current batch is empty
+          if (rowOffset == 0) {
+            currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex] << 1);
+          }
+        } else {
+          // current batch has value at minTime, consume current value
+          if (rowOffset == 0) {
+            long time = cachedBatchDataArray[seriesIndex].currentTime();
+            ReadWriteIOUtils.write(time, timeBAOSList[seriesIndex]);
+            currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex] << 1) | FLAG;
+            TSDataType type = cachedBatchDataArray[seriesIndex].getDataType();
+            switch (type) {
+              case INT32:
+                int intValue = cachedBatchDataArray[seriesIndex].getInt();
+                if (encoder != null && encoder.needEncode(time)) {
+                  intValue = encoder.encodeInt(intValue, time);
+                }
+                ReadWriteIOUtils.write(intValue, valueBAOSList[seriesIndex]);
+                break;
+              case INT64:
+                long longValue = cachedBatchDataArray[seriesIndex].getLong();
+                if (encoder != null && encoder.needEncode(time)) {
+                  longValue = encoder.encodeLong(longValue, time);
+                }
+                ReadWriteIOUtils.write(longValue, valueBAOSList[seriesIndex]);
+                break;
+              case FLOAT:
+                float floatValue = cachedBatchDataArray[seriesIndex].getFloat();
+                if (encoder != null && encoder.needEncode(time)) {
+                  floatValue = encoder.encodeFloat(floatValue, time);
+                }
+                ReadWriteIOUtils.write(floatValue, valueBAOSList[seriesIndex]);
+                break;
+              case DOUBLE:
+                double doubleValue = cachedBatchDataArray[seriesIndex].getDouble();
+                if (encoder != null && encoder.needEncode(time)) {
+                  doubleValue = encoder.encodeDouble(doubleValue, time);
+                }
+                ReadWriteIOUtils.write(doubleValue, valueBAOSList[seriesIndex]);
+                break;
+              case BOOLEAN:
+                ReadWriteIOUtils.write(cachedBatchDataArray[seriesIndex].getBoolean(),
+                        valueBAOSList[seriesIndex]);
+                break;
+              case TEXT:
+                ReadWriteIOUtils
+                        .write(cachedBatchDataArray[seriesIndex].getBinary(),
+                                valueBAOSList[seriesIndex]);
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                        String.format("Data type %s is not supported.", type));
+            }
+          }
+
+          // move next
+          cachedBatchDataArray[seriesIndex].next();
+
+          // get next batch if current batch is empty
+          if (!cachedBatchDataArray[seriesIndex].hasCurrent()) {
+            // still have remaining batch data in queue
+            if (!noMoreDataInQueueArray[seriesIndex]) {
+              fillCache(seriesIndex);
+            }
+          }
+
+          // try to put the next timestamp into the heap
+          if (cachedBatchDataArray[seriesIndex].hasCurrent()) {
+            long time = cachedBatchDataArray[seriesIndex].currentTime();
+            timeHeap.add(time);
 
 Review comment:
   Actually, i think it's a bug to use timeheap here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365075900
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      times[i] = null;
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
+        switch (dataType) {
+          case BOOLEAN:
+            if (values[i] == null) {
+              values[i] = new byte[1];
 
 Review comment:
   I have removed the bitmap in non-align resultset.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou edited a comment on issue #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou edited a comment on issue #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#issuecomment-573274294
 
 
   > > And I hope you can do a performance testing between the align version and non-align version. As expected, the no-align version should improve as least 3 times.
   > 
   > Hi, do we have result now?
   
   I tried to run the test on my own computer, but the speed of writing test data was so slow. It may take up to 12 hours to write all data. I have no any idea about the reason. 
   I also tried to run the performance test on server. I can connect VPN, but I cannot log in to the server.  
   Maybe I should reduce the amount of test data and try on my computer again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363655004
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      times[i] = null;
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
 
 Review comment:
   I would suggest you cast the types to the TSDataType class in the constructor since the `valueOf()` method could be costly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365633714
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
 
 Review comment:
   And add add an if statement to judge whether the current batch data is empty.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365630004
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
 ##########
 @@ -88,6 +89,32 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context)
       throw new StorageEngineException(e.getMessage());
     }
   }
+  
+  public QueryDataSet executeNonAlign(QueryContext context)
+      throws StorageEngineException, IOException {
+
+    Filter timeFilter = null;
+    if (optimizedExpression != null) {
+      timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
+    }
+
+    List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+    for (int i = 0; i < deduplicatedPaths.size(); i++) {
+      Path path = deduplicatedPaths.get(i);
+      TSDataType dataType = deduplicatedDataTypes.get(i);
+
+      ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
+          true);
+      readersOfSelectedSeries.add(reader);
+    }
+
+    try {
+      return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes,
+          readersOfSelectedSeries);
+    } catch (InterruptedException e) {
+      throw new StorageEngineException(e.getMessage());
 
 Review comment:
   Use the exception to construct another exception or the stack traces will be lost and some problems will be untrackable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365634652
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
+            int rowCount = 0;
+            while (rowCount < fetchSize) {
+              
+              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+                break;
+              }
+              
+              if (batchData != null && batchData.hasCurrent()) {
+                if (rowOffset == 0) {
+                  long time = batchData.currentTime();
+                  ReadWriteIOUtils.write(time, timeBAOS);
+                  TSDataType type = batchData.getDataType();
+                  switch (type) {
+                    case INT32:
+                      int intValue = batchData.getInt();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        intValue = encoder.encodeInt(intValue, time);
+                      }
+                      ReadWriteIOUtils.write(intValue, valueBAOS);
+                      break;
+                    case INT64:
+                      long longValue = batchData.getLong();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        longValue = encoder.encodeLong(longValue, time);
+                      }
+                      ReadWriteIOUtils.write(longValue, valueBAOS);
+                      break;
+                    case FLOAT:
+                      float floatValue = batchData.getFloat();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        floatValue = encoder.encodeFloat(floatValue, time);
+                      }
+                      ReadWriteIOUtils.write(floatValue, valueBAOS);
+                      break;
+                    case DOUBLE:
+                      double doubleValue = batchData.getDouble();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        doubleValue = encoder.encodeDouble(doubleValue, time);
+                      }
+                      ReadWriteIOUtils.write(doubleValue, valueBAOS);
+                      break;
+                    case BOOLEAN:
+                      ReadWriteIOUtils.write(batchData.getBoolean(),
+                              valueBAOS);
+                      break;
+                    case TEXT:
+                      ReadWriteIOUtils
+                              .write(batchData.getBinary(),
+                                      valueBAOS);
+                      break;
+                    default:
+                      throw new UnSupportedDataTypeException(
+                              String.format("Data type %s is not supported.", type));
+                  }
+                }
+                batchData.next();
+              }
+              else {
+                break;
+              }
 
 Review comment:
   If the current batch data has no more data, we shouldn't just break the while loop. Otherwise, The timeValueBAOSPair won't get to fetchSize.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363713626
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 ##########
 @@ -174,6 +180,126 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet queryDa
     tsQueryDataSet.setValueList(valueList);
     return tsQueryDataSet;
   }
+  
+
+  public static TSQueryNonAlignDataSet convertQueryNonAlignDataSetByFetchSize(QueryDataSet queryDataSet,
+      int fetchSize, WatermarkEncoder watermarkEncoder) throws IOException {
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    int columnNum = dataTypes.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 3;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
+    for (int i = 0; i < fetchSize; i++) {
+      if (queryDataSet.hasNext()) {
+        RowRecord rowRecord = queryDataSet.next();
+        if (watermarkEncoder != null) {
+          rowRecord = watermarkEncoder.encodeRecord(rowRecord);
+        }
+        // use columnOutput to write byte array
+        List<Field> fields = rowRecord.getFields();
+        for (int k = 0; k < fields.size(); k++) {
+          dataOutputStreams[k].writeLong(rowRecord.getTimestamp());
+          Field field = fields.get(k);
+          DataOutputStream dataOutputStream = dataOutputStreams[3*k + 1]; // DO NOT FORGET +1
+          if (field.getDataType() == null) {
+            bitmap[k] =  (bitmap[k] << 1);
+          } else {
+            bitmap[k] =  (bitmap[k] << 1) | flag;
+            TSDataType type = field.getDataType();
+            switch (type) {
+              case INT32:
+                dataOutputStream.writeInt(field.getIntV());
+                valueOccupation[k] += 4;
+                break;
+              case INT64:
+                dataOutputStream.writeLong(field.getLongV());
+                valueOccupation[k] += 8;
+                break;
+              case FLOAT:
+                dataOutputStream.writeFloat(field.getFloatV());
+                valueOccupation[k] += 4;
+                break;
+              case DOUBLE:
+                dataOutputStream.writeDouble(field.getDoubleV());
+                valueOccupation[k] += 8;
+                break;
+              case BOOLEAN:
+                dataOutputStream.writeBoolean(field.getBoolV());
+                valueOccupation[k] += 1;
+                break;
+              case TEXT:
+                dataOutputStream.writeInt(field.getBinaryV().getLength());
+                dataOutputStream.write(field.getBinaryV().getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + field.getBinaryV().getLength();
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                    String.format("Data type %s is not supported.", type));
+            }
+          }
+        }
+        rowCount++;
+        if (rowCount % 8 == 0) {
+          for (int j = 0; j < bitmap.length; j++) {
+            DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+            dataBitmapOutputStream.writeByte(bitmap[j]);
+            // we should clear the bitmap every 8 row record
+            bitmap[j] = 0;
+          }
+        }
+      } else {
+        break;
+      }
+    }
+
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    if (remaining != 0) {
+      for (int j = 0; j < bitmap.length; j++) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+        dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+      }
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+    List<ByteBuffer> timeList = new LinkedList<>();
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 0; i < byteArrayOutputStreams.length; i += 3) {
+      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+      timeBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      timeBuffer.flip();
+      timeList.add(timeBuffer);
+      
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[i/3]);
+      valueBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i+2].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryNonAlignDataSet.setBitmapList(bitmapList);
+    tsQueryNonAlignDataSet.setValueList(valueList);
+    return tsQueryNonAlignDataSet;
 
 Review comment:
   [Beyond this PR]
   There will be 3 ByteBuffers for each timeseries (or column). When the number of columns is large while the fetch size is relatively small, this can introduce structural overhead that may not be negligible.
   One way to reduce the overhead may be putting them all together in a ByteBuffer and record the offsets and lengths of each sub-buffer.
   This is not a job for now but it seems worth trying.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365075550
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -408,14 +451,32 @@ public long getLong(int columnIndex) throws SQLException {
   @Override
   public long getLong(String columnName) throws SQLException {
     checkRecord();
-    if (columnName.equals(TIMESTAMP_STR)) {
-      return BytesUtils.bytesToLong(time);
+    if (align) {
+      if (columnName.equals(TIMESTAMP_STR)) {
+        return BytesUtils.bytesToLong(time);
+      }
+      int index = columnInfoMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToLong(values[index]);
+      } else {
+        throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+      }
     }
-    int index = columnInfoMap.get(columnName) - START_INDEX;
-    if (values[index] != null) {
-      return BytesUtils.bytesToLong(values[index]);
-    } else {
-      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    else {
+      if (columnName.startsWith(TIMESTAMP_STR)) {
+        String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+        int index = columnInfoMap.get(column) - START_INDEX;
+        if (times[index].length != 8) {
+          return -1;
+        }
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364038957
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 ##########
 @@ -174,6 +180,126 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet queryDa
     tsQueryDataSet.setValueList(valueList);
     return tsQueryDataSet;
   }
+  
+
+  public static TSQueryNonAlignDataSet convertQueryNonAlignDataSetByFetchSize(QueryDataSet queryDataSet,
+      int fetchSize, WatermarkEncoder watermarkEncoder) throws IOException {
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    int columnNum = dataTypes.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 3;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
+    for (int i = 0; i < fetchSize; i++) {
+      if (queryDataSet.hasNext()) {
+        RowRecord rowRecord = queryDataSet.next();
+        if (watermarkEncoder != null) {
+          rowRecord = watermarkEncoder.encodeRecord(rowRecord);
+        }
+        // use columnOutput to write byte array
+        List<Field> fields = rowRecord.getFields();
+        for (int k = 0; k < fields.size(); k++) {
+          dataOutputStreams[k].writeLong(rowRecord.getTimestamp());
+          Field field = fields.get(k);
+          DataOutputStream dataOutputStream = dataOutputStreams[3*k + 1]; // DO NOT FORGET +1
+          if (field.getDataType() == null) {
+            bitmap[k] =  (bitmap[k] << 1);
+          } else {
+            bitmap[k] =  (bitmap[k] << 1) | flag;
+            TSDataType type = field.getDataType();
+            switch (type) {
+              case INT32:
+                dataOutputStream.writeInt(field.getIntV());
+                valueOccupation[k] += 4;
+                break;
+              case INT64:
+                dataOutputStream.writeLong(field.getLongV());
+                valueOccupation[k] += 8;
+                break;
+              case FLOAT:
+                dataOutputStream.writeFloat(field.getFloatV());
+                valueOccupation[k] += 4;
+                break;
+              case DOUBLE:
+                dataOutputStream.writeDouble(field.getDoubleV());
+                valueOccupation[k] += 8;
+                break;
+              case BOOLEAN:
+                dataOutputStream.writeBoolean(field.getBoolV());
+                valueOccupation[k] += 1;
+                break;
+              case TEXT:
+                dataOutputStream.writeInt(field.getBinaryV().getLength());
+                dataOutputStream.write(field.getBinaryV().getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + field.getBinaryV().getLength();
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                    String.format("Data type %s is not supported.", type));
+            }
+          }
+        }
+        rowCount++;
+        if (rowCount % 8 == 0) {
+          for (int j = 0; j < bitmap.length; j++) {
+            DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+            dataBitmapOutputStream.writeByte(bitmap[j]);
+            // we should clear the bitmap every 8 row record
+            bitmap[j] = 0;
+          }
+        }
+      } else {
+        break;
+      }
+    }
+
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    if (remaining != 0) {
+      for (int j = 0; j < bitmap.length; j++) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+        dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+      }
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+    List<ByteBuffer> timeList = new LinkedList<>();
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 0; i < byteArrayOutputStreams.length; i += 3) {
+      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+      timeBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      timeBuffer.flip();
+      timeList.add(timeBuffer);
+      
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[i/3]);
+      valueBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i+2].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryNonAlignDataSet.setBitmapList(bitmapList);
+    tsQueryNonAlignDataSet.setValueList(valueList);
+    return tsQueryNonAlignDataSet;
 
 Review comment:
   If you put them into one ByteBuffer, the multithread will lose its advantages. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363655699
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      times[i] = null;
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
+        switch (dataType) {
+          case BOOLEAN:
+            if (values[i] == null) {
+              values[i] = new byte[1];
 
 Review comment:
   I wonder if the `new`s are necessary because the datatype of a column is static and the byte arrays seem reusable (exclude the text type).
   Consider null values, you may use a bitmap to indicate which rows are null.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365629069
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
 ##########
 @@ -0,0 +1,1292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.*;
+import java.util.*;
+
+public class IoTDBNonAlignQueryResultSet implements ResultSet {
+
+  private static final String TIMESTAMP_STR = "Time";
+  private static final int TIMESTAMP_STR_LENGTH = 4;
+  private static final int START_INDEX = 2;
+  private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
+  private static final String EMPTY_STR = "";
+  private Statement statement = null;
+  private String sql;
+  private SQLWarning warningChain = null;
+  private boolean isClosed = false;
+  private TSIService.Iface client = null;
+  private List<String> columnInfoList; // no deduplication
+  private List<String> columnTypeList; // no deduplication
+  private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
+  private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
+  private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
+  private int fetchSize;
+  private boolean emptyResultSet = false;
+
+  private TSQueryNonAlignDataSet tsQueryNonAlignDataSet = null;
+  private byte[][] times; // used for disable align
+  private byte[][] values; // used to cache the current row record value
+
+  private long sessionId;
+  private long queryId;
+  private boolean ignoreTimeStamp = false;
+
+  public IoTDBNonAlignQueryResultSet() {
+    // do nothing
+  }
+  
+  // for disable align clause
+  public IoTDBNonAlignQueryResultSet(Statement statement, List<String> columnNameList,
+      List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
+      String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset) 
+      throws SQLException {
+    this.statement = statement;
+    this.fetchSize = statement.getFetchSize();
+    this.columnTypeList = columnTypeList;
+
+    times = new byte[columnNameList.size()][Long.BYTES];
+    values = new byte[columnNameList.size()][];
+
+    this.columnInfoList = new ArrayList<>();
+    // deduplicate and map
+    this.columnInfoMap = new HashMap<>();
+    this.columnInfoMap.put(TIMESTAMP_STR, 1);
+    this.columnTypeDeduplicatedList = new ArrayList<>();
+    int index = START_INDEX;
+    for (int i = 0; i < columnNameList.size(); i++) {
+      String name = columnNameList.get(i);
+      columnInfoList.add(TIMESTAMP_STR + name);
+      columnInfoList.add(name);
+      if (!columnInfoMap.containsKey(name)) {
+        columnInfoMap.put(name, index++);
+        columnTypeDeduplicatedList.add(columnTypeList.get(i));
+      }
+    }
+
+    this.ignoreTimeStamp = ignoreTimeStamp;
+    this.client = client;
+    this.sql = sql;
+    this.queryId = queryId;
+    this.tsQueryNonAlignDataSet = dataset;
+    this.sessionId = sessionId;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean absolute(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void afterLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void beforeFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void cancelRowUpdates() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (isClosed) {
+      return;
+    }
+    if (client != null) {
+      try {
+        TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
+        closeReq.setQueryId(queryId);
+        TSStatus closeResp = client.closeOperation(closeReq);
+        RpcUtils.verifySuccess(closeResp);
+      } catch (IoTDBRPCException e) {
+        throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
+      } catch (TException e) {
+        throw new SQLException(
+            "Error occurs when connecting to server for close operation, becasue: " + e);
+      }
+    }
+    client = null;
+    isClosed = true;
+  }
+
+
+  @Override
+  public void deleteRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int findColumn(String columnName) {
+    return columnInfoMap.get(columnName);
+  }
+
+  @Override
+  public boolean first() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Array getArray(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Array getArray(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getAsciiStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getAsciiStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+    return getBigDecimal(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnName) throws SQLException {
+    return new BigDecimal(Objects.requireNonNull(getValueByName(columnName)));
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+    MathContext mc = new MathContext(scale);
+    return getBigDecimal(columnIndex).round(mc);
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnName, int scale) throws SQLException {
+    return getBigDecimal(findColumn(columnName), scale);
+  }
+
+  @Override
+  public InputStream getBinaryStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getBinaryStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Blob getBlob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Blob getBlob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws SQLException {
+    return getBoolean(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public boolean getBoolean(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToBool(values[index]);
+    }
+    else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public byte getByte(int columnIndex) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte getByte(String columnName) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte[] getBytes(int columnIndex) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte[] getBytes(String columnName) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getCharacterStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getCharacterStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Clob getClob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Clob getClob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getConcurrency() {
+    return ResultSet.CONCUR_READ_ONLY;
+  }
+
+  @Override
+  public String getCursorName() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Date getDate(int columnIndex) throws SQLException {
+    return new Date(getLong(columnIndex));
+  }
+
+  @Override
+  public Date getDate(String columnName) throws SQLException {
+    return getDate(findColumn(columnName));
+  }
+
+  @Override
+  public Date getDate(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Date getDate(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws SQLException {
+    return getDouble(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public double getDouble(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToDouble(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    return ResultSet.FETCH_FORWARD;
+  }
+
+  @Override
+  public void setFetchDirection(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void setFetchSize(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws SQLException {
+    return getFloat(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public float getFloat(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToFloat(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws SQLException {
+    return getInt(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public int getInt(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToInt(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws SQLException {
+    return getLong(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public long getLong(String columnName) throws SQLException {
+    checkRecord();
+    if (columnName.startsWith(TIMESTAMP_STR)) {
+      String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+      int index = columnInfoMap.get(column) - START_INDEX;
+      return BytesUtils.bytesToLong(times[index]);
+    }
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToLong(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() {
+    return new IoTDBResultMetadata(columnInfoList, columnTypeList, false);
+  }
+
+  @Override
+  public Reader getNCharacterStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getNCharacterStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public NClob getNClob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public NClob getNClob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public String getNString(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public String getNString(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Object getObject(int columnIndex) throws SQLException {
+    return getObject(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public Object getObject(String columnName) throws SQLException {
+    return getValueByName(columnName);
+  }
+
+  @Override
+  public Object getObject(int arg0, Map<String, Class<?>> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Object getObject(String arg0, Map<String, Class<?>> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T getObject(int arg0, Class<T> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T getObject(String arg0, Class<T> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Ref getRef(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Ref getRef(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public RowId getRowId(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public RowId getRowId(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLXML getSQLXML(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLXML getSQLXML(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public short getShort(int columnIndex) throws SQLException {
+    return getShort(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public short getShort(String columnName) throws SQLException {
+    return Short.parseShort(Objects.requireNonNull(getValueByName(columnName)));
+  }
+
+  @Override
+  public Statement getStatement() {
+    return this.statement;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws SQLException {
+    return getString(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public String getString(String columnName) throws SQLException {
+    return getValueByName(columnName);
+  }
+
+  @Override
+  public Time getTime(int columnIndex) throws SQLException {
+    return new Time(getLong(columnIndex));
+  }
+
+  @Override
+  public Time getTime(String columnName) throws SQLException {
+    return getTime(findColumn(columnName));
+  }
+
+  @Override
+  public Time getTime(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Time getTime(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Timestamp getTimestamp(int columnIndex) throws SQLException {
+    return new Timestamp(getLong(columnIndex));
+  }
+
+  @Override
+  public Timestamp getTimestamp(String columnName) throws SQLException {
+    return getTimestamp(findColumn(columnName));
+  }
+
+  @Override
+  public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Timestamp getTimestamp(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getType() {
+    return ResultSet.TYPE_FORWARD_ONLY;
+  }
+
+  @Override
+  public URL getURL(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public URL getURL(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getUnicodeStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getUnicodeStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLWarning getWarnings() {
+    return warningChain;
+  }
+
+  @Override
+  public void insertRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isAfterLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isBeforeFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  @Override
+  public boolean isFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean last() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void moveToCurrentRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void moveToInsertRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    if (hasCachedResults()) {
+      constructNonAlignOneRow();
+      return true;
+    }
+    if (emptyResultSet) {
+      return false;
+    }
+    if (fetchResults()) {
+      constructNonAlignOneRow();
+      return true;
+    }
+    return false;
+  }
+
+
+  /**
+   * @return true means has results
+   */
+  private boolean fetchResults() throws SQLException {
+    rowsIndex = 0;
+    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, false);
+    try {
+      TSFetchResultsResp resp = client.fetchResults(req);
+
+      try {
+        RpcUtils.verifySuccess(resp.getStatus());
+      } catch (IoTDBRPCException e) {
+        throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
+      }
+      if (!resp.hasResultSet) {
+        emptyResultSet = true;
+      } else {
+        tsQueryNonAlignDataSet = resp.getNonAlignQueryDataSet();
+        if (tsQueryNonAlignDataSet == null) {
+          return false;
+        }
+      }
+      return resp.hasResultSet;
+    } catch (TException e) {
+      throw new SQLException(
+          "Cannot fetch result from server, because of network connection: {} ", e);
+    }
+  }
+
+  private boolean hasCachedResults() {
+    return (tsQueryNonAlignDataSet != null && hasTimesRemaining());
+  }
+  
+  // check if has times remaining for disable align clause
+  private boolean hasTimesRemaining() {
+    for (ByteBuffer time : tsQueryNonAlignDataSet.timeList) {
+      if (time.hasRemaining()) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.timeList.size(); i++) {
+      times[i] = null;
+      values[i] = null;
+      if (tsQueryNonAlignDataSet.timeList.get(i).remaining() >= Long.BYTES) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
+        switch (dataType) {
+          case BOOLEAN:
+            if (values[i] == null) {
+              values[i] = new byte[1];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case INT32:
+            if (values[i] == null) {
+              values[i] = new byte[Integer.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case INT64:
+            if (values[i] == null) {
+              values[i] = new byte[Long.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case FLOAT:
+            if (values[i] == null) {
+              values[i] = new byte[Float.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case DOUBLE:
+            if (values[i] == null) {
+              values[i] = new byte[Double.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case TEXT:
+            int length = valueBuffer.getInt();
+            values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+        }
+      }
+      else {
+        times[i] = new byte[Long.BYTES];
+        values[i] = EMPTY_STR.getBytes();
+      }
+    }
+    rowsIndex++;
+  }
+
+  @Override
+  public boolean previous() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void refreshRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean relative(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean rowDeleted() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean rowInserted() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean rowUpdated() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateArray(int arg0, Array arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateArray(String arg0, Array arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateAsciiStream(int arg0, InputStream arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateAsciiStream(String arg0, InputStream arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateAsciiStream(int arg0, InputStream arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateAsciiStream(String arg0, InputStream arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateAsciiStream(int arg0, InputStream arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateAsciiStream(String arg0, InputStream arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBigDecimal(String arg0, BigDecimal arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBinaryStream(int arg0, InputStream arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBinaryStream(String arg0, InputStream arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBinaryStream(int arg0, InputStream arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBinaryStream(String arg0, InputStream arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBinaryStream(int arg0, InputStream arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBinaryStream(String arg0, InputStream arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBlob(int arg0, Blob arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBlob(String arg0, Blob arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBlob(int arg0, InputStream arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBlob(String arg0, InputStream arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBlob(int arg0, InputStream arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBlob(String arg0, InputStream arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBoolean(int arg0, boolean arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBoolean(String arg0, boolean arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateByte(int arg0, byte arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateByte(String arg0, byte arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBytes(int arg0, byte[] arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateBytes(String arg0, byte[] arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateCharacterStream(int arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateCharacterStream(String arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateCharacterStream(int arg0, Reader arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateCharacterStream(String arg0, Reader arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateCharacterStream(int arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateCharacterStream(String arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateClob(int arg0, Clob arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateClob(String arg0, Clob arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateClob(int arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateClob(String arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateClob(int arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+
+  }
+
+  @Override
+  public void updateClob(String arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateDate(int arg0, Date arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateDate(String arg0, Date arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateDouble(int arg0, double arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateDouble(String arg0, double arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateFloat(int arg0, float arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateFloat(String arg0, float arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateInt(int arg0, int arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateInt(String arg0, int arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateLong(int arg0, long arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateLong(String arg0, long arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNCharacterStream(int arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNCharacterStream(String arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNCharacterStream(int arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNCharacterStream(String arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNClob(int arg0, NClob arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNClob(String arg0, NClob arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNClob(int arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNClob(String arg0, Reader arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNClob(int arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNClob(String arg0, Reader arg1, long arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNString(int arg0, String arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNString(String arg0, String arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNull(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateNull(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateObject(int arg0, Object arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateObject(String arg0, Object arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateObject(int arg0, Object arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateObject(String arg0, Object arg1, int arg2) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateRef(int arg0, Ref arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateRef(String arg0, Ref arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateRowId(int arg0, RowId arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateRowId(String arg0, RowId arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateSQLXML(int arg0, SQLXML arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateSQLXML(String arg0, SQLXML arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateShort(int arg0, short arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateShort(String arg0, short arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateString(int arg0, String arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateString(String arg0, String arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateTime(int arg0, Time arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateTime(String arg0, Time arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateTimestamp(int arg0, Timestamp arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void updateTimestamp(String arg0, Timestamp arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean wasNull() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+  
+  private void checkRecord() throws SQLException {
+    if (Objects.isNull(tsQueryNonAlignDataSet)) {
+      throw new SQLException("No record remains");
+    }
+  }
+
+  private String findColumnNameByIndex(int columnIndex) throws SQLException {
+    if (columnIndex <= 0) {
+      throw new SQLException("column index should start from 1");
+    }
+    if (columnIndex > columnInfoList.size()) {
+      throw new SQLException(
+          String.format("column index %d out of range %d", columnIndex, columnInfoList.size()));
+    }
+    return columnInfoList.get(columnIndex - 1);
+  }
+
+  private String getValueByName(String columnName) throws SQLException {
+    checkRecord();
+    if (columnName.startsWith(TIMESTAMP_STR)) {
+      String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+      int index = columnInfoMap.get(column) - START_INDEX;
+      if (times[index].length == 0) {
+        return null;
+      }
+      return String.valueOf(BytesUtils.bytesToLong(times[index]));
+    }
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (index < 0 || index >= values.length || values[index] == null || values[index].length < 1) {
+      return null;
+    }
+    TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(index));
 
 Review comment:
   It would be better if you just make columnTypeDeduplicatedList a List of TsDataType.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363659799
 
 

 ##########
 File path: service-rpc/src/main/thrift/rpc.thrift
 ##########
 @@ -219,12 +222,22 @@ struct ServerProperties {
 }
 
 struct TSQueryDataSet{
-   // ByteBuffer for time column
-   1: required binary time
-   // ByteBuffer for each column values
-   2: required list<binary> valueList
-   // Bitmap for each column to indicate whether it is a null value
-   3: required list<binary> bitmapList
+    // ByteBuffer for time column
+    1: required binary time
+    // ByteBuffer for each column values
+    2: required list<binary> valueList
+    // Bitmap for each column to indicate whether it is a null value
+    3: required list<binary> bitmapList
+}
+
+struct TSQueryNonAlignDataSet{
+    // ByteBuffer for each time column
+	1: required list<binary> timeList
+	// ByteBuffer for each column values
 
 Review comment:
   Reformat this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365633668
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
 
 Review comment:
   Maybe you will get an batch data that is empty here, you should can the if to while in order to get the first not empty batch data.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365630643
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
 ##########
 @@ -0,0 +1,1292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.*;
+import java.util.*;
+
+public class IoTDBNonAlignQueryResultSet implements ResultSet {
+
+  private static final String TIMESTAMP_STR = "Time";
+  private static final int TIMESTAMP_STR_LENGTH = 4;
+  private static final int START_INDEX = 2;
+  private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
+  private static final String EMPTY_STR = "";
+  private Statement statement = null;
+  private String sql;
+  private SQLWarning warningChain = null;
+  private boolean isClosed = false;
+  private TSIService.Iface client = null;
+  private List<String> columnInfoList; // no deduplication
+  private List<String> columnTypeList; // no deduplication
+  private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
+  private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
+  private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
 
 Review comment:
   This field is not used. It's used to record the row index in current TSQueryDataSet but is not needed in this TSQueryNonAlignDataSet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365626693
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      times[i] = null;
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
 
 Review comment:
   You may make columnTypeDeduplicatedList a List of TsDataType and modify the constructor.
   ![image](https://user-images.githubusercontent.com/23610645/72228777-98c3b600-35e4-11ea-8cdc-8d5df4ad451b.png)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363657450
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -1226,7 +1380,7 @@ public boolean wasNull() throws SQLException {
   }
 
   private void checkRecord() throws SQLException {
-    if (Objects.isNull(tsQueryDataSet)) {
+    if (Objects.isNull(tsQueryDataSet) && Objects.isNull(tsQueryNonAlignDataSet)) {
 
 Review comment:
   I think maybe it is time to break the aligned dataset and the non-aligned dataset into two classes because these predicates are a little inelegant. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365628587
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java
 ##########
 @@ -0,0 +1,1292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.*;
+import java.util.*;
+
+public class IoTDBNonAlignQueryResultSet implements ResultSet {
+
+  private static final String TIMESTAMP_STR = "Time";
+  private static final int TIMESTAMP_STR_LENGTH = 4;
+  private static final int START_INDEX = 2;
+  private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
+  private static final String EMPTY_STR = "";
+  private Statement statement = null;
+  private String sql;
+  private SQLWarning warningChain = null;
+  private boolean isClosed = false;
+  private TSIService.Iface client = null;
+  private List<String> columnInfoList; // no deduplication
+  private List<String> columnTypeList; // no deduplication
+  private Map<String, Integer> columnInfoMap; // used because the server returns deduplicated columns
+  private List<String> columnTypeDeduplicatedList; // deduplicated from columnTypeList
+  private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet
+  private int fetchSize;
+  private boolean emptyResultSet = false;
+
+  private TSQueryNonAlignDataSet tsQueryNonAlignDataSet = null;
+  private byte[][] times; // used for disable align
+  private byte[][] values; // used to cache the current row record value
+
+  private long sessionId;
+  private long queryId;
+  private boolean ignoreTimeStamp = false;
+
+  public IoTDBNonAlignQueryResultSet() {
+    // do nothing
+  }
+  
+  // for disable align clause
+  public IoTDBNonAlignQueryResultSet(Statement statement, List<String> columnNameList,
+      List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
+      String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset) 
+      throws SQLException {
+    this.statement = statement;
+    this.fetchSize = statement.getFetchSize();
+    this.columnTypeList = columnTypeList;
+
+    times = new byte[columnNameList.size()][Long.BYTES];
+    values = new byte[columnNameList.size()][];
+
+    this.columnInfoList = new ArrayList<>();
+    // deduplicate and map
+    this.columnInfoMap = new HashMap<>();
+    this.columnInfoMap.put(TIMESTAMP_STR, 1);
+    this.columnTypeDeduplicatedList = new ArrayList<>();
+    int index = START_INDEX;
+    for (int i = 0; i < columnNameList.size(); i++) {
+      String name = columnNameList.get(i);
+      columnInfoList.add(TIMESTAMP_STR + name);
+      columnInfoList.add(name);
+      if (!columnInfoMap.containsKey(name)) {
+        columnInfoMap.put(name, index++);
+        columnTypeDeduplicatedList.add(columnTypeList.get(i));
+      }
+    }
+
+    this.ignoreTimeStamp = ignoreTimeStamp;
+    this.client = client;
+    this.sql = sql;
+    this.queryId = queryId;
+    this.tsQueryNonAlignDataSet = dataset;
+    this.sessionId = sessionId;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean absolute(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void afterLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void beforeFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void cancelRowUpdates() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (isClosed) {
+      return;
+    }
+    if (client != null) {
+      try {
+        TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
+        closeReq.setQueryId(queryId);
+        TSStatus closeResp = client.closeOperation(closeReq);
+        RpcUtils.verifySuccess(closeResp);
+      } catch (IoTDBRPCException e) {
+        throw new SQLException("Error occurs for close opeation in server side becasuse " + e);
+      } catch (TException e) {
+        throw new SQLException(
+            "Error occurs when connecting to server for close operation, becasue: " + e);
+      }
+    }
+    client = null;
+    isClosed = true;
+  }
+
+
+  @Override
+  public void deleteRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int findColumn(String columnName) {
+    return columnInfoMap.get(columnName);
+  }
+
+  @Override
+  public boolean first() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Array getArray(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Array getArray(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getAsciiStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getAsciiStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+    return getBigDecimal(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnName) throws SQLException {
+    return new BigDecimal(Objects.requireNonNull(getValueByName(columnName)));
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+    MathContext mc = new MathContext(scale);
+    return getBigDecimal(columnIndex).round(mc);
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(String columnName, int scale) throws SQLException {
+    return getBigDecimal(findColumn(columnName), scale);
+  }
+
+  @Override
+  public InputStream getBinaryStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getBinaryStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Blob getBlob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Blob getBlob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws SQLException {
+    return getBoolean(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public boolean getBoolean(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToBool(values[index]);
+    }
+    else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public byte getByte(int columnIndex) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte getByte(String columnName) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte[] getBytes(int columnIndex) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public byte[] getBytes(String columnName) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getCharacterStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getCharacterStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Clob getClob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Clob getClob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getConcurrency() {
+    return ResultSet.CONCUR_READ_ONLY;
+  }
+
+  @Override
+  public String getCursorName() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Date getDate(int columnIndex) throws SQLException {
+    return new Date(getLong(columnIndex));
+  }
+
+  @Override
+  public Date getDate(String columnName) throws SQLException {
+    return getDate(findColumn(columnName));
+  }
+
+  @Override
+  public Date getDate(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Date getDate(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws SQLException {
+    return getDouble(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public double getDouble(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToDouble(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    return ResultSet.FETCH_FORWARD;
+  }
+
+  @Override
+  public void setFetchDirection(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void setFetchSize(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws SQLException {
+    return getFloat(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public float getFloat(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToFloat(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws SQLException {
+    return getInt(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public int getInt(String columnName) throws SQLException {
+    checkRecord();
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToInt(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws SQLException {
+    return getLong(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public long getLong(String columnName) throws SQLException {
+    checkRecord();
+    if (columnName.startsWith(TIMESTAMP_STR)) {
+      String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+      int index = columnInfoMap.get(column) - START_INDEX;
+      return BytesUtils.bytesToLong(times[index]);
+    }
+    int index = columnInfoMap.get(columnName) - START_INDEX;
+    if (values[index] != null) {
+      return BytesUtils.bytesToLong(values[index]);
+    } else {
+      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    }
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() {
+    return new IoTDBResultMetadata(columnInfoList, columnTypeList, false);
+  }
+
+  @Override
+  public Reader getNCharacterStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Reader getNCharacterStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public NClob getNClob(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public NClob getNClob(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public String getNString(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public String getNString(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Object getObject(int columnIndex) throws SQLException {
+    return getObject(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public Object getObject(String columnName) throws SQLException {
+    return getValueByName(columnName);
+  }
+
+  @Override
+  public Object getObject(int arg0, Map<String, Class<?>> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Object getObject(String arg0, Map<String, Class<?>> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T getObject(int arg0, Class<T> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public <T> T getObject(String arg0, Class<T> arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Ref getRef(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Ref getRef(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public RowId getRowId(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public RowId getRowId(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLXML getSQLXML(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLXML getSQLXML(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public short getShort(int columnIndex) throws SQLException {
+    return getShort(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public short getShort(String columnName) throws SQLException {
+    return Short.parseShort(Objects.requireNonNull(getValueByName(columnName)));
+  }
+
+  @Override
+  public Statement getStatement() {
+    return this.statement;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws SQLException {
+    return getString(findColumnNameByIndex(columnIndex));
+  }
+
+  @Override
+  public String getString(String columnName) throws SQLException {
+    return getValueByName(columnName);
+  }
+
+  @Override
+  public Time getTime(int columnIndex) throws SQLException {
+    return new Time(getLong(columnIndex));
+  }
+
+  @Override
+  public Time getTime(String columnName) throws SQLException {
+    return getTime(findColumn(columnName));
+  }
+
+  @Override
+  public Time getTime(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Time getTime(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Timestamp getTimestamp(int columnIndex) throws SQLException {
+    return new Timestamp(getLong(columnIndex));
+  }
+
+  @Override
+  public Timestamp getTimestamp(String columnName) throws SQLException {
+    return getTimestamp(findColumn(columnName));
+  }
+
+  @Override
+  public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public Timestamp getTimestamp(String arg0, Calendar arg1) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public int getType() {
+    return ResultSet.TYPE_FORWARD_ONLY;
+  }
+
+  @Override
+  public URL getURL(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public URL getURL(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getUnicodeStream(int arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public InputStream getUnicodeStream(String arg0) throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public SQLWarning getWarnings() {
+    return warningChain;
+  }
+
+  @Override
+  public void insertRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isAfterLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isBeforeFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  @Override
+  public boolean isFirst() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean isLast() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean last() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void moveToCurrentRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public void moveToInsertRow() throws SQLException {
+    throw new SQLException(Constant.METHOD_NOT_SUPPORTED);
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    if (hasCachedResults()) {
+      constructNonAlignOneRow();
+      return true;
+    }
+    if (emptyResultSet) {
+      return false;
+    }
+    if (fetchResults()) {
+      constructNonAlignOneRow();
+      return true;
+    }
+    return false;
+  }
+
+
+  /**
+   * @return true means has results
+   */
+  private boolean fetchResults() throws SQLException {
+    rowsIndex = 0;
+    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, false);
+    try {
+      TSFetchResultsResp resp = client.fetchResults(req);
+
+      try {
+        RpcUtils.verifySuccess(resp.getStatus());
+      } catch (IoTDBRPCException e) {
+        throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
+      }
+      if (!resp.hasResultSet) {
+        emptyResultSet = true;
+      } else {
+        tsQueryNonAlignDataSet = resp.getNonAlignQueryDataSet();
+        if (tsQueryNonAlignDataSet == null) {
+          return false;
+        }
+      }
+      return resp.hasResultSet;
+    } catch (TException e) {
+      throw new SQLException(
+          "Cannot fetch result from server, because of network connection: {} ", e);
+    }
+  }
+
+  private boolean hasCachedResults() {
+    return (tsQueryNonAlignDataSet != null && hasTimesRemaining());
+  }
+  
+  // check if has times remaining for disable align clause
+  private boolean hasTimesRemaining() {
+    for (ByteBuffer time : tsQueryNonAlignDataSet.timeList) {
+      if (time.hasRemaining()) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.timeList.size(); i++) {
+      times[i] = null;
+      values[i] = null;
+      if (tsQueryNonAlignDataSet.timeList.get(i).remaining() >= Long.BYTES) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
 
 Review comment:
   You set `times[i]` and `values[i]` to null every time, so I don't think the if-clause is meaningful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365626693
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      times[i] = null;
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
 
 Review comment:
   You may make columnTypeDeduplicatedList a List<TsDataType> and modify the constructor.
   ![image](https://user-images.githubusercontent.com/23610645/72228777-98c3b600-35e4-11ea-8cdc-8d5df4ad451b.png)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364034385
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -1226,7 +1380,7 @@ public boolean wasNull() throws SQLException {
   }
 
   private void checkRecord() throws SQLException {
-    if (Objects.isNull(tsQueryDataSet)) {
+    if (Objects.isNull(tsQueryDataSet) && Objects.isNull(tsQueryNonAlignDataSet)) {
 
 Review comment:
   I agree, we should extract an abstract super class to reserve some common functions and make the two different dataset extend to it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365635400
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
+            int rowCount = 0;
+            while (rowCount < fetchSize) {
+              
+              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+                break;
+              }
+              
+              if (batchData != null && batchData.hasCurrent()) {
+                if (rowOffset == 0) {
+                  long time = batchData.currentTime();
+                  ReadWriteIOUtils.write(time, timeBAOS);
+                  TSDataType type = batchData.getDataType();
+                  switch (type) {
+                    case INT32:
+                      int intValue = batchData.getInt();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        intValue = encoder.encodeInt(intValue, time);
+                      }
+                      ReadWriteIOUtils.write(intValue, valueBAOS);
+                      break;
+                    case INT64:
+                      long longValue = batchData.getLong();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        longValue = encoder.encodeLong(longValue, time);
+                      }
+                      ReadWriteIOUtils.write(longValue, valueBAOS);
+                      break;
+                    case FLOAT:
+                      float floatValue = batchData.getFloat();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        floatValue = encoder.encodeFloat(floatValue, time);
+                      }
+                      ReadWriteIOUtils.write(floatValue, valueBAOS);
+                      break;
+                    case DOUBLE:
+                      double doubleValue = batchData.getDouble();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        doubleValue = encoder.encodeDouble(doubleValue, time);
+                      }
+                      ReadWriteIOUtils.write(doubleValue, valueBAOS);
+                      break;
+                    case BOOLEAN:
+                      ReadWriteIOUtils.write(batchData.getBoolean(),
+                              valueBAOS);
+                      break;
+                    case TEXT:
+                      ReadWriteIOUtils
+                              .write(batchData.getBinary(),
+                                      valueBAOS);
+                      break;
+                    default:
+                      throw new UnSupportedDataTypeException(
+                              String.format("Data type %s is not supported.", type));
+                  }
+                }
+                batchData.next();
+              }
+              else {
+                break;
+              }
+              if (rowOffset == 0) {
+                rowCount++;
+                if (rowLimit > 0) {
+                  alreadyReturnedRowNum++;
+                }
+              } else {
+                rowOffset--;
+              }
+            }
+            
+            Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
+            
+            blockingQueue.put(timeValueBAOSPair);
+            // if the queue also has free space, just submit another itself
+            if (blockingQueue.remainingCapacity() > 0) {
+              pool.submit(this);
+            }
+            // the queue has no more space
+            // remove itself from the QueryTaskPoolManager
+            else {
+              reader.setManagedByQueryManager(false);
+            }
+            return;
+          }
+          blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+          // set the hasRemaining field in reader to false
+          // tell the Consumer not to submit another task for this reader any more
+          reader.setHasRemaining(false);
+          // remove itself from the QueryTaskPoolManager
+          reader.setManagedByQueryManager(false);
+        }
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+      } catch (IOException e) {
+        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+      } catch (Exception e) {
+        LOGGER.error("Something gets wrong: ", e);
+      }
+      
+    }
+    
+  }
+
+  
+  private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
+  
+  // Blocking queue list for each time value buffer pair
+  private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
+  
+  private boolean initialized = false;
+  
+  // indicate that there is no more batch data in the corresponding queue
+  // in case that the consumer thread is blocked on the queue and won't get runnable any more
+  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+  // noMoreDataInQueue can still be true
+  // its usage is to tell the consumer thread not to call the take() method.
+
+  // capacity for blocking queue
+  private static final int BLOCKING_QUEUE_CAPACITY = 5;
+
+  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
+  
+  /**
+   * constructor of EngineDataSet.
+   *
+   * @param paths paths in List structure
+   * @param dataTypes time series data type
+   * @param readers readers in List(IPointReader) structure
+   */
+  public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
+      List<ManagedSeriesReader> readers) throws InterruptedException {
+    super(paths, dataTypes);
+    this.seriesReaderWithoutValueFilterList = readers;
+    blockingQueueArray = new BlockingQueue[readers.size()];
+    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+      blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+    }
+  }
+  
+  private void init(WatermarkEncoder encoder, int fetchSize) 
+      throws InterruptedException {
+    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+      ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
+      reader.setHasRemaining(true);
+      reader.setManagedByQueryManager(true);
+      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize));
+    }
+    this.initialized = true;
+  }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffers and value buffers
+   */
+  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    if (!initialized) {
+      init(encoder, fetchSize);
+    }
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
+        if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
+          ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+          // if the reader isn't being managed and still has more data,
+          // that means this read task leave the pool before because the queue has no more space
+          // now we should submit it again
+          if (reader.isManagedByQueryManager() && reader.hasRemaining()) {
+            reader.setManagedByQueryManager(true);
+            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], 
+                encoder, rowOffset, rowLimit, fetchSize));
+          }
+        }
+        Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll();
+        if (timevalueBAOSPair != null) {
+          timeBAOSList[seriesIndex] = timevalueBAOSPair.left;
+          valueBAOSList[seriesIndex] = timevalueBAOSPair.right;
+        }
+        else {
+          timeBAOSList[seriesIndex] = new PublicBAOS();
+          valueBAOSList[seriesIndex] = new PublicBAOS();
+        }
+      }
+    }
+
+    List<ByteBuffer> timeBufferList = new ArrayList<>();
+    List<ByteBuffer> valueBufferList = new ArrayList<>();
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      // add time buffer of current series
+      putPBOSToBuffer(timeBAOSList, timeBufferList, seriesIndex);
+
+      // add value buffer of current series
+      putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex);
+    }
+
+    // set time buffers, value buffers and bitmap buffers
+    tsQueryNonAlignDataSet.setTimeList(timeBufferList);
+    tsQueryNonAlignDataSet.setValueList(valueBufferList);
+
+    return tsQueryNonAlignDataSet;
+  }
+  
+  private void putPBOSToBuffer(PublicBAOS[] aBAOSList, List<ByteBuffer> aBufferList,
+      int tsIndex) {
+    ByteBuffer aBuffer = ByteBuffer.allocate(aBAOSList[tsIndex].size());
+    aBuffer.put(aBAOSList[tsIndex].getBuf(), 0, aBAOSList[tsIndex].size());
+    aBuffer.flip();
+    aBufferList.add(aBuffer);
+  }
+
+  @Override
+  protected boolean hasNextWithoutConstraint() throws IOException {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  protected RowRecord nextWithoutConstraint() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
 
 Review comment:
   These two method won't be used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365075177
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -703,7 +782,18 @@ private boolean fetchResults() throws SQLException {
   }
 
   private boolean hasCachedResults() {
-    return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining();
+    return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining()
+        || tsQueryNonAlignDataSet != null && hasTimesRemaining();
 
 Review comment:
   Fixed. I have separated the non-align resultset from the original resultset.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364038348
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 ##########
 @@ -174,6 +180,126 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet queryDa
     tsQueryDataSet.setValueList(valueList);
     return tsQueryDataSet;
   }
+  
+
+  public static TSQueryNonAlignDataSet convertQueryNonAlignDataSetByFetchSize(QueryDataSet queryDataSet,
+      int fetchSize, WatermarkEncoder watermarkEncoder) throws IOException {
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    int columnNum = dataTypes.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 3;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
 
 Review comment:
   Same as above, delete the field

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365634781
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
+            int rowCount = 0;
+            while (rowCount < fetchSize) {
+              
+              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+                break;
+              }
+              
+              if (batchData != null && batchData.hasCurrent()) {
+                if (rowOffset == 0) {
+                  long time = batchData.currentTime();
+                  ReadWriteIOUtils.write(time, timeBAOS);
+                  TSDataType type = batchData.getDataType();
+                  switch (type) {
+                    case INT32:
+                      int intValue = batchData.getInt();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        intValue = encoder.encodeInt(intValue, time);
+                      }
+                      ReadWriteIOUtils.write(intValue, valueBAOS);
+                      break;
+                    case INT64:
+                      long longValue = batchData.getLong();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        longValue = encoder.encodeLong(longValue, time);
+                      }
+                      ReadWriteIOUtils.write(longValue, valueBAOS);
+                      break;
+                    case FLOAT:
+                      float floatValue = batchData.getFloat();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        floatValue = encoder.encodeFloat(floatValue, time);
+                      }
+                      ReadWriteIOUtils.write(floatValue, valueBAOS);
+                      break;
+                    case DOUBLE:
+                      double doubleValue = batchData.getDouble();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        doubleValue = encoder.encodeDouble(doubleValue, time);
+                      }
+                      ReadWriteIOUtils.write(doubleValue, valueBAOS);
+                      break;
+                    case BOOLEAN:
+                      ReadWriteIOUtils.write(batchData.getBoolean(),
+                              valueBAOS);
+                      break;
+                    case TEXT:
+                      ReadWriteIOUtils
+                              .write(batchData.getBinary(),
+                                      valueBAOS);
+                      break;
+                    default:
+                      throw new UnSupportedDataTypeException(
+                              String.format("Data type %s is not supported.", type));
+                  }
+                }
+                batchData.next();
+              }
+              else {
+                break;
+              }
+              if (rowOffset == 0) {
+                rowCount++;
+                if (rowLimit > 0) {
+                  alreadyReturnedRowNum++;
+                }
+              } else {
+                rowOffset--;
+              }
+            }
+            
+            Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
+            
+            blockingQueue.put(timeValueBAOSPair);
+            // if the queue also has free space, just submit another itself
+            if (blockingQueue.remainingCapacity() > 0) {
+              pool.submit(this);
+            }
+            // the queue has no more space
+            // remove itself from the QueryTaskPoolManager
+            else {
+              reader.setManagedByQueryManager(false);
+            }
+            return;
+          }
+          blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+          // set the hasRemaining field in reader to false
+          // tell the Consumer not to submit another task for this reader any more
+          reader.setHasRemaining(false);
+          // remove itself from the QueryTaskPoolManager
+          reader.setManagedByQueryManager(false);
+        }
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+      } catch (IOException e) {
+        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+      } catch (Exception e) {
+        LOGGER.error("Something gets wrong: ", e);
+      }
+      
+    }
+    
+  }
+
+  
+  private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
+  
+  // Blocking queue list for each time value buffer pair
+  private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
+  
+  private boolean initialized = false;
+  
+  // indicate that there is no more batch data in the corresponding queue
+  // in case that the consumer thread is blocked on the queue and won't get runnable any more
+  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+  // noMoreDataInQueue can still be true
+  // its usage is to tell the consumer thread not to call the take() method.
+
+  // capacity for blocking queue
+  private static final int BLOCKING_QUEUE_CAPACITY = 5;
+
+  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
+  
+  /**
+   * constructor of EngineDataSet.
+   *
+   * @param paths paths in List structure
+   * @param dataTypes time series data type
+   * @param readers readers in List(IPointReader) structure
+   */
+  public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
+      List<ManagedSeriesReader> readers) throws InterruptedException {
+    super(paths, dataTypes);
+    this.seriesReaderWithoutValueFilterList = readers;
+    blockingQueueArray = new BlockingQueue[readers.size()];
+    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+      blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+    }
+  }
+  
+  private void init(WatermarkEncoder encoder, int fetchSize) 
+      throws InterruptedException {
+    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+      ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
+      reader.setHasRemaining(true);
+      reader.setManagedByQueryManager(true);
+      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize));
+    }
+    this.initialized = true;
+  }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffers and value buffers
+   */
+  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    if (!initialized) {
+      init(encoder, fetchSize);
+    }
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
+        if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
+          ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+          // if the reader isn't being managed and still has more data,
+          // that means this read task leave the pool before because the queue has no more space
+          // now we should submit it again
+          if (reader.isManagedByQueryManager() && reader.hasRemaining()) {
+            reader.setManagedByQueryManager(true);
+            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], 
+                encoder, rowOffset, rowLimit, fetchSize));
+          }
+        }
+        Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll();
 
 Review comment:
   The poll() function is not an blocking function. We should use take() instead of poll().

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363708848
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      timeBAOSList[seriesIndex] = new PublicBAOS();
+      valueBAOSList[seriesIndex] = new PublicBAOS();
+      bitmapBAOSList[seriesIndex] = new PublicBAOS();
+    }
+
+    // used to record a bitmap for every 8 row record
+    int[] currentBitmapList = new int[seriesNum];
+    int rowCount = 0;
+    while (rowCount < fetchSize) {
+
+      if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit) || timeHeap.isEmpty()) {
+        break;
+      }
+
+      for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+
+        if (cachedBatchDataArray[seriesIndex] == null
+                || !cachedBatchDataArray[seriesIndex].hasCurrent()) {
+          // current batch is empty
+          if (rowOffset == 0) {
+            currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex] << 1);
+          }
+        } else {
+          // current batch has value at minTime, consume current value
+          if (rowOffset == 0) {
+            long time = cachedBatchDataArray[seriesIndex].currentTime();
+            ReadWriteIOUtils.write(time, timeBAOSList[seriesIndex]);
+            currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex] << 1) | FLAG;
+            TSDataType type = cachedBatchDataArray[seriesIndex].getDataType();
+            switch (type) {
+              case INT32:
+                int intValue = cachedBatchDataArray[seriesIndex].getInt();
+                if (encoder != null && encoder.needEncode(time)) {
+                  intValue = encoder.encodeInt(intValue, time);
+                }
+                ReadWriteIOUtils.write(intValue, valueBAOSList[seriesIndex]);
+                break;
+              case INT64:
+                long longValue = cachedBatchDataArray[seriesIndex].getLong();
+                if (encoder != null && encoder.needEncode(time)) {
+                  longValue = encoder.encodeLong(longValue, time);
+                }
+                ReadWriteIOUtils.write(longValue, valueBAOSList[seriesIndex]);
+                break;
+              case FLOAT:
+                float floatValue = cachedBatchDataArray[seriesIndex].getFloat();
+                if (encoder != null && encoder.needEncode(time)) {
+                  floatValue = encoder.encodeFloat(floatValue, time);
+                }
+                ReadWriteIOUtils.write(floatValue, valueBAOSList[seriesIndex]);
+                break;
+              case DOUBLE:
+                double doubleValue = cachedBatchDataArray[seriesIndex].getDouble();
+                if (encoder != null && encoder.needEncode(time)) {
+                  doubleValue = encoder.encodeDouble(doubleValue, time);
+                }
+                ReadWriteIOUtils.write(doubleValue, valueBAOSList[seriesIndex]);
+                break;
+              case BOOLEAN:
+                ReadWriteIOUtils.write(cachedBatchDataArray[seriesIndex].getBoolean(),
+                        valueBAOSList[seriesIndex]);
+                break;
+              case TEXT:
+                ReadWriteIOUtils
+                        .write(cachedBatchDataArray[seriesIndex].getBinary(),
+                                valueBAOSList[seriesIndex]);
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                        String.format("Data type %s is not supported.", type));
+            }
+          }
+
+          // move next
+          cachedBatchDataArray[seriesIndex].next();
+
+          // get next batch if current batch is empty
+          if (!cachedBatchDataArray[seriesIndex].hasCurrent()) {
+            // still have remaining batch data in queue
+            if (!noMoreDataInQueueArray[seriesIndex]) {
+              fillCache(seriesIndex);
+            }
+          }
+
+          // try to put the next timestamp into the heap
+          if (cachedBatchDataArray[seriesIndex].hasCurrent()) {
+            long time = cachedBatchDataArray[seriesIndex].currentTime();
+            timeHeap.add(time);
 
 Review comment:
   I wonder if it is necessary to use a heap for the non-align query.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365073980
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      timeBAOSList[seriesIndex] = new PublicBAOS();
+      valueBAOSList[seriesIndex] = new PublicBAOS();
+      bitmapBAOSList[seriesIndex] = new PublicBAOS();
+    }
+
+    // used to record a bitmap for every 8 row record
+    int[] currentBitmapList = new int[seriesNum];
 
 Review comment:
   I have removed the bitmap in non align dataset, since it's useless.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364033716
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
 
 Review comment:
   In the non align case, the bitmap field is unnecessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365630334
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
 ##########
 @@ -826,19 +827,34 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
       }
 
       QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
-      TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
-          sessionIdUsernameMap.get(req.sessionId));
-
-      boolean hasResultSet = result.bufferForTime().limit() != 0;
-      if (!hasResultSet) {
-        queryId2DataSet.remove(req.queryId);
+      if (req.isAlign) {
+        TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
+            sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = result.bufferForTime().limit() != 0;
+        if (!hasResultSet) {
+          queryId2DataSet.remove(req.queryId);
 
 Review comment:
   Maybe you can call `QueryResourceManager.endQuery` here to release query resources ASAP.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365629804
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
 ##########
 @@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+  
+  private static class ReadTask implements Runnable {
+    
+    private final ManagedSeriesReader reader;
+    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private WatermarkEncoder encoder;
+    private int rowOffset;
+    private int fetchSize;
+    private int rowLimit;
+    private int alreadyReturnedRowNum = 0;
+    
+    public ReadTask(ManagedSeriesReader reader, 
+        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
+        WatermarkEncoder encoder,
+        int rowOffset, int rowLimit, int fetchSize) {
+      this.reader = reader;
+      this.blockingQueue = blockingQueue;
+      this.encoder = encoder;
+      this.rowOffset = rowOffset;
+      this.rowLimit = rowLimit;
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public void run() {
+      PublicBAOS timeBAOS = new PublicBAOS();
+      PublicBAOS valueBAOS = new PublicBAOS();
+      try {
+        synchronized (reader) {
+          // if the task is submitted, there must be free space in the queue
+          // so here we don't need to check whether the queue has free space
+          // the reader has next batch
+          if (reader.hasNextBatch()) {
+            BatchData batchData = reader.nextBatch();
+            
+            int rowCount = 0;
+            while (rowCount < fetchSize) {
+              
+              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+                break;
+              }
+              
+              if (batchData != null && batchData.hasCurrent()) {
+                if (rowOffset == 0) {
+                  long time = batchData.currentTime();
+                  ReadWriteIOUtils.write(time, timeBAOS);
+                  TSDataType type = batchData.getDataType();
+                  switch (type) {
+                    case INT32:
+                      int intValue = batchData.getInt();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        intValue = encoder.encodeInt(intValue, time);
+                      }
+                      ReadWriteIOUtils.write(intValue, valueBAOS);
+                      break;
+                    case INT64:
+                      long longValue = batchData.getLong();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        longValue = encoder.encodeLong(longValue, time);
+                      }
+                      ReadWriteIOUtils.write(longValue, valueBAOS);
+                      break;
+                    case FLOAT:
+                      float floatValue = batchData.getFloat();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        floatValue = encoder.encodeFloat(floatValue, time);
+                      }
+                      ReadWriteIOUtils.write(floatValue, valueBAOS);
+                      break;
+                    case DOUBLE:
+                      double doubleValue = batchData.getDouble();
+                      if (encoder != null && encoder.needEncode(time)) {
+                        doubleValue = encoder.encodeDouble(doubleValue, time);
+                      }
+                      ReadWriteIOUtils.write(doubleValue, valueBAOS);
+                      break;
+                    case BOOLEAN:
+                      ReadWriteIOUtils.write(batchData.getBoolean(),
+                              valueBAOS);
+                      break;
+                    case TEXT:
+                      ReadWriteIOUtils
+                              .write(batchData.getBinary(),
+                                      valueBAOS);
+                      break;
+                    default:
+                      throw new UnSupportedDataTypeException(
+                              String.format("Data type %s is not supported.", type));
+                  }
+                }
+                batchData.next();
+              }
+              else {
+                break;
+              }
+              if (rowOffset == 0) {
+                rowCount++;
+                if (rowLimit > 0) {
+                  alreadyReturnedRowNum++;
+                }
+              } else {
+                rowOffset--;
+              }
+            }
+            
+            Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
+            
+            blockingQueue.put(timeValueBAOSPair);
+            // if the queue also has free space, just submit another itself
+            if (blockingQueue.remainingCapacity() > 0) {
+              pool.submit(this);
+            }
+            // the queue has no more space
+            // remove itself from the QueryTaskPoolManager
+            else {
+              reader.setManagedByQueryManager(false);
+            }
+            return;
+          }
+          blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+          // set the hasRemaining field in reader to false
+          // tell the Consumer not to submit another task for this reader any more
+          reader.setHasRemaining(false);
+          // remove itself from the QueryTaskPoolManager
+          reader.setManagedByQueryManager(false);
+        }
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+      } catch (IOException e) {
+        LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+      } catch (Exception e) {
+        LOGGER.error("Something gets wrong: ", e);
+      }
+      
+    }
+    
+  }
+
+  
+  private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
+  
+  // Blocking queue list for each time value buffer pair
+  private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
+  
+  private boolean initialized = false;
+  
+  // indicate that there is no more batch data in the corresponding queue
+  // in case that the consumer thread is blocked on the queue and won't get runnable any more
+  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+  // noMoreDataInQueue can still be true
+  // its usage is to tell the consumer thread not to call the take() method.
+
+  // capacity for blocking queue
+  private static final int BLOCKING_QUEUE_CAPACITY = 5;
+
+  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
+  
+  /**
+   * constructor of EngineDataSet.
+   *
+   * @param paths paths in List structure
+   * @param dataTypes time series data type
+   * @param readers readers in List(IPointReader) structure
+   */
+  public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
+      List<ManagedSeriesReader> readers) throws InterruptedException {
+    super(paths, dataTypes);
+    this.seriesReaderWithoutValueFilterList = readers;
+    blockingQueueArray = new BlockingQueue[readers.size()];
+    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+      blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+    }
+  }
+  
+  private void init(WatermarkEncoder encoder, int fetchSize) 
+      throws InterruptedException {
+    for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+      ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
+      reader.setHasRemaining(true);
+      reader.setManagedByQueryManager(true);
+      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize));
+    }
+    this.initialized = true;
+  }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffers and value buffers
+   */
+  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    if (!initialized) {
+      init(encoder, fetchSize);
+    }
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
+        if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
+          ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+          // if the reader isn't being managed and still has more data,
+          // that means this read task leave the pool before because the queue has no more space
+          // now we should submit it again
+          if (reader.isManagedByQueryManager() && reader.hasRemaining()) {
+            reader.setManagedByQueryManager(true);
+            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], 
+                encoder, rowOffset, rowLimit, fetchSize));
+          }
+        }
+        Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll();
+        if (timevalueBAOSPair != null) {
+          timeBAOSList[seriesIndex] = timevalueBAOSPair.left;
+          valueBAOSList[seriesIndex] = timevalueBAOSPair.right;
+        }
+        else {
+          timeBAOSList[seriesIndex] = new PublicBAOS();
+          valueBAOSList[seriesIndex] = new PublicBAOS();
+        }
+      }
+    }
+
+    List<ByteBuffer> timeBufferList = new ArrayList<>();
+    List<ByteBuffer> valueBufferList = new ArrayList<>();
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      // add time buffer of current series
+      putPBOSToBuffer(timeBAOSList, timeBufferList, seriesIndex);
+
+      // add value buffer of current series
+      putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex);
+    }
+
+    // set time buffers, value buffers and bitmap buffers
+    tsQueryNonAlignDataSet.setTimeList(timeBufferList);
+    tsQueryNonAlignDataSet.setValueList(valueBufferList);
+
+    return tsQueryNonAlignDataSet;
+  }
+  
+  private void putPBOSToBuffer(PublicBAOS[] aBAOSList, List<ByteBuffer> aBufferList,
+      int tsIndex) {
+    ByteBuffer aBuffer = ByteBuffer.allocate(aBAOSList[tsIndex].size());
+    aBuffer.put(aBAOSList[tsIndex].getBuf(), 0, aBAOSList[tsIndex].size());
+    aBuffer.flip();
+    aBufferList.add(aBuffer);
 
 Review comment:
   Maybe you can just use `ByteBuffer.wrap`? It does not seem you reuse the baos.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365073819
 
 

 ##########
 File path: service-rpc/src/main/thrift/rpc.thrift
 ##########
 @@ -146,6 +148,7 @@ struct TSFetchResultsResp{
 	1: required TSStatus status
 	2: required bool hasResultSet
 	3: optional TSQueryDataSet queryDataSet
+	4: optional TSQueryNonAlignDataSet nonAlignQueryDataSet
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365074167
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      timeBAOSList[seriesIndex] = new PublicBAOS();
+      valueBAOSList[seriesIndex] = new PublicBAOS();
+      bitmapBAOSList[seriesIndex] = new PublicBAOS();
+    }
+
+    // used to record a bitmap for every 8 row record
+    int[] currentBitmapList = new int[seriesNum];
+    int rowCount = 0;
+    while (rowCount < fetchSize) {
+
+      if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit) || timeHeap.isEmpty()) {
+        break;
+      }
+
+      for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+
+        if (cachedBatchDataArray[seriesIndex] == null
+                || !cachedBatchDataArray[seriesIndex].hasCurrent()) {
+          // current batch is empty
+          if (rowOffset == 0) {
+            currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex] << 1);
+          }
+        } else {
+          // current batch has value at minTime, consume current value
+          if (rowOffset == 0) {
+            long time = cachedBatchDataArray[seriesIndex].currentTime();
+            ReadWriteIOUtils.write(time, timeBAOSList[seriesIndex]);
+            currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex] << 1) | FLAG;
+            TSDataType type = cachedBatchDataArray[seriesIndex].getDataType();
+            switch (type) {
+              case INT32:
+                int intValue = cachedBatchDataArray[seriesIndex].getInt();
+                if (encoder != null && encoder.needEncode(time)) {
+                  intValue = encoder.encodeInt(intValue, time);
+                }
+                ReadWriteIOUtils.write(intValue, valueBAOSList[seriesIndex]);
+                break;
+              case INT64:
+                long longValue = cachedBatchDataArray[seriesIndex].getLong();
+                if (encoder != null && encoder.needEncode(time)) {
+                  longValue = encoder.encodeLong(longValue, time);
+                }
+                ReadWriteIOUtils.write(longValue, valueBAOSList[seriesIndex]);
+                break;
+              case FLOAT:
+                float floatValue = cachedBatchDataArray[seriesIndex].getFloat();
+                if (encoder != null && encoder.needEncode(time)) {
+                  floatValue = encoder.encodeFloat(floatValue, time);
+                }
+                ReadWriteIOUtils.write(floatValue, valueBAOSList[seriesIndex]);
+                break;
+              case DOUBLE:
+                double doubleValue = cachedBatchDataArray[seriesIndex].getDouble();
+                if (encoder != null && encoder.needEncode(time)) {
+                  doubleValue = encoder.encodeDouble(doubleValue, time);
+                }
+                ReadWriteIOUtils.write(doubleValue, valueBAOSList[seriesIndex]);
+                break;
+              case BOOLEAN:
+                ReadWriteIOUtils.write(cachedBatchDataArray[seriesIndex].getBoolean(),
+                        valueBAOSList[seriesIndex]);
+                break;
+              case TEXT:
+                ReadWriteIOUtils
+                        .write(cachedBatchDataArray[seriesIndex].getBinary(),
+                                valueBAOSList[seriesIndex]);
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                        String.format("Data type %s is not supported.", type));
+            }
+          }
+
+          // move next
+          cachedBatchDataArray[seriesIndex].next();
+
+          // get next batch if current batch is empty
+          if (!cachedBatchDataArray[seriesIndex].hasCurrent()) {
+            // still have remaining batch data in queue
+            if (!noMoreDataInQueueArray[seriesIndex]) {
+              fillCache(seriesIndex);
+            }
+          }
+
+          // try to put the next timestamp into the heap
+          if (cachedBatchDataArray[seriesIndex].hasCurrent()) {
+            long time = cachedBatchDataArray[seriesIndex].currentTime();
+            timeHeap.add(time);
 
 Review comment:
   Yes, the timeheap is not necessary. Removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365635906
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
 ##########
 @@ -806,19 +827,34 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
       }
 
       QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
-      TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
-          sessionIdUsernameMap.get(req.sessionId));
-
-      boolean hasResultSet = result.bufferForTime().limit() != 0;
-      if (!hasResultSet) {
-        queryId2DataSet.remove(req.queryId);
+      if (req.isAlign) {
+        TSQueryDataSet result = fillRpcReturnData(req.fetchSize, queryDataSet,
+            sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = result.bufferForTime().limit() != 0;
+        if (!hasResultSet) {
+          queryId2DataSet.remove(req.queryId);
+        }
+        TSFetchResultsResp resp = getTSFetchResultsResp(getStatus(TSStatusCode.SUCCESS_STATUS,
+            "FetchResult successfully. Has more result: " + hasResultSet));
+        resp.setHasResultSet(hasResultSet);
+        resp.setQueryDataSet(result);
+        resp.setIsAlign(true);
+        return resp;
+      }
+      else {
+        TSQueryNonAlignDataSet nonAlignResult = fillRpcNonAlignReturnData(req.fetchSize, queryDataSet,
+            sessionIdUsernameMap.get(req.sessionId));
+        boolean hasResultSet = nonAlignResult.getTimeList().get(0).limit() != 0;
 
 Review comment:
   The first timeseries's  time buffer is empty does not mean the others don't have

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364030693
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -408,14 +451,32 @@ public long getLong(int columnIndex) throws SQLException {
   @Override
   public long getLong(String columnName) throws SQLException {
     checkRecord();
-    if (columnName.equals(TIMESTAMP_STR)) {
-      return BytesUtils.bytesToLong(time);
+    if (align) {
+      if (columnName.equals(TIMESTAMP_STR)) {
+        return BytesUtils.bytesToLong(time);
+      }
+      int index = columnInfoMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToLong(values[index]);
+      } else {
+        throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+      }
     }
-    int index = columnInfoMap.get(columnName) - START_INDEX;
-    if (values[index] != null) {
-      return BytesUtils.bytesToLong(values[index]);
-    } else {
-      throw new SQLException(String.format(VALUE_IS_NULL, columnName));
+    else {
+      if (columnName.startsWith(TIMESTAMP_STR)) {
+        String column = columnName.substring(TIMESTAMP_STR_LENGTH);
+        int index = columnInfoMap.get(column) - START_INDEX;
+        if (times[index].length != 8) {
+          return -1;
+        }
 
 Review comment:
   This if statement is unnecessary, the second dimension of times must be 8 as you initialize it to be equal to Long.BYTES.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365074611
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 ##########
 @@ -174,6 +180,126 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet queryDa
     tsQueryDataSet.setValueList(valueList);
     return tsQueryDataSet;
   }
+  
+
+  public static TSQueryNonAlignDataSet convertQueryNonAlignDataSetByFetchSize(QueryDataSet queryDataSet,
+      int fetchSize, WatermarkEncoder watermarkEncoder) throws IOException {
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    int columnNum = dataTypes.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 3;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364037551
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
 
 Review comment:
   The fillNonAlignBuffer logic should be put into each single read task. The main thread just takes from the blocking queue and set it into QueryDataSet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jixuan1989 commented on issue #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on issue #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#issuecomment-573273130
 
 
   > And I hope you can do a performance testing between the align version and non-align version. As expected, the no-align version should improve as least 3 times.
   
   Hi, do we have result now?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365074551
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 ##########
 @@ -174,6 +180,126 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet queryDa
     tsQueryDataSet.setValueList(valueList);
     return tsQueryDataSet;
   }
+  
+
+  public static TSQueryNonAlignDataSet convertQueryNonAlignDataSetByFetchSize(QueryDataSet queryDataSet,
+      int fetchSize, WatermarkEncoder watermarkEncoder) throws IOException {
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    int columnNum = dataTypes.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 3;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
+    for (int i = 0; i < fetchSize; i++) {
+      if (queryDataSet.hasNext()) {
+        RowRecord rowRecord = queryDataSet.next();
+        if (watermarkEncoder != null) {
+          rowRecord = watermarkEncoder.encodeRecord(rowRecord);
+        }
+        // use columnOutput to write byte array
+        List<Field> fields = rowRecord.getFields();
+        for (int k = 0; k < fields.size(); k++) {
+          dataOutputStreams[k].writeLong(rowRecord.getTimestamp());
+          Field field = fields.get(k);
+          DataOutputStream dataOutputStream = dataOutputStreams[3*k + 1]; // DO NOT FORGET +1
+          if (field.getDataType() == null) {
+            bitmap[k] =  (bitmap[k] << 1);
+          } else {
+            bitmap[k] =  (bitmap[k] << 1) | flag;
+            TSDataType type = field.getDataType();
+            switch (type) {
+              case INT32:
+                dataOutputStream.writeInt(field.getIntV());
+                valueOccupation[k] += 4;
+                break;
+              case INT64:
+                dataOutputStream.writeLong(field.getLongV());
+                valueOccupation[k] += 8;
+                break;
+              case FLOAT:
+                dataOutputStream.writeFloat(field.getFloatV());
+                valueOccupation[k] += 4;
+                break;
+              case DOUBLE:
+                dataOutputStream.writeDouble(field.getDoubleV());
+                valueOccupation[k] += 8;
+                break;
+              case BOOLEAN:
+                dataOutputStream.writeBoolean(field.getBoolV());
+                valueOccupation[k] += 1;
+                break;
+              case TEXT:
+                dataOutputStream.writeInt(field.getBinaryV().getLength());
+                dataOutputStream.write(field.getBinaryV().getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + field.getBinaryV().getLength();
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                    String.format("Data type %s is not supported.", type));
+            }
+          }
+        }
+        rowCount++;
+        if (rowCount % 8 == 0) {
+          for (int j = 0; j < bitmap.length; j++) {
+            DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+            dataBitmapOutputStream.writeByte(bitmap[j]);
+            // we should clear the bitmap every 8 row record
+            bitmap[j] = 0;
+          }
+        }
+      } else {
+        break;
+      }
+    }
+
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    if (remaining != 0) {
+      for (int j = 0; j < bitmap.length; j++) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+        dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+      }
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+    List<ByteBuffer> timeList = new LinkedList<>();
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 0; i < byteArrayOutputStreams.length; i += 3) {
+      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+      timeBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      timeBuffer.flip();
+      timeList.add(timeBuffer);
+      
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[i/3]);
+      valueBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i+2].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryNonAlignDataSet.setBitmapList(bitmapList);
+    tsQueryNonAlignDataSet.setValueList(valueList);
+    return tsQueryNonAlignDataSet;
 
 Review comment:
   I find out this part of code has lots of bugs and it is useless.  Removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364036713
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
 
 Review comment:
   Same as above the bitmap is useless.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365074635
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r363705417
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+    int seriesNum = seriesReaderWithoutValueFilterList.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
+
+    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      timeBAOSList[seriesIndex] = new PublicBAOS();
+      valueBAOSList[seriesIndex] = new PublicBAOS();
+      bitmapBAOSList[seriesIndex] = new PublicBAOS();
+    }
+
+    // used to record a bitmap for every 8 row record
+    int[] currentBitmapList = new int[seriesNum];
 
 Review comment:
   record -> records
   optional: BitMap (and other occurrences)-> nullValueBitMap, so it would be more clear what this bit map is intended for.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365076147
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      times[i] = null;
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        if (times[i] == null) {
+          times[i] = new byte[Long.BYTES];
+        }
+        tsQueryNonAlignDataSet.timeList.get(i).get(times[i]);
+        ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i);
+        TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
 
 Review comment:
   Can you please tell me more details about how to cast the enum types? I didn't get your point. Thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364206135
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
 ##########
 @@ -327,6 +328,167 @@ public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws
 
     return tsQueryDataSet;
   }
+  
+  /**
+   * for RPC in RawData query between client and server
+   * fill time buffer, value buffers and bitmap buffers
+   */
+  public TSQueryNonAlignDataSet fillNonAlignBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
 
 Review comment:
   May I ask what object will be in the blocking queue in this scenario? In each single read task, we get batch data from reader and transfer to bytebuffer. And then?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r364044830
 
 

 ##########
 File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 ##########
 @@ -174,6 +180,126 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet queryDa
     tsQueryDataSet.setValueList(valueList);
     return tsQueryDataSet;
   }
+  
+
+  public static TSQueryNonAlignDataSet convertQueryNonAlignDataSetByFetchSize(QueryDataSet queryDataSet,
+      int fetchSize, WatermarkEncoder watermarkEncoder) throws IOException {
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    int columnNum = dataTypes.size();
+    TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+    // one time column and each value column has a actual value buffer and a bitmap value to indicate whether it is a null
+    int columnNumWithTime = columnNum * 3;
+    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+    // used to record a bitmap for every 8 row record
+    int[] bitmap = new int[columnNum];
+    for (int i = 0; i < fetchSize; i++) {
+      if (queryDataSet.hasNext()) {
+        RowRecord rowRecord = queryDataSet.next();
+        if (watermarkEncoder != null) {
+          rowRecord = watermarkEncoder.encodeRecord(rowRecord);
+        }
+        // use columnOutput to write byte array
+        List<Field> fields = rowRecord.getFields();
+        for (int k = 0; k < fields.size(); k++) {
+          dataOutputStreams[k].writeLong(rowRecord.getTimestamp());
+          Field field = fields.get(k);
+          DataOutputStream dataOutputStream = dataOutputStreams[3*k + 1]; // DO NOT FORGET +1
+          if (field.getDataType() == null) {
+            bitmap[k] =  (bitmap[k] << 1);
+          } else {
+            bitmap[k] =  (bitmap[k] << 1) | flag;
+            TSDataType type = field.getDataType();
+            switch (type) {
+              case INT32:
+                dataOutputStream.writeInt(field.getIntV());
+                valueOccupation[k] += 4;
+                break;
+              case INT64:
+                dataOutputStream.writeLong(field.getLongV());
+                valueOccupation[k] += 8;
+                break;
+              case FLOAT:
+                dataOutputStream.writeFloat(field.getFloatV());
+                valueOccupation[k] += 4;
+                break;
+              case DOUBLE:
+                dataOutputStream.writeDouble(field.getDoubleV());
+                valueOccupation[k] += 8;
+                break;
+              case BOOLEAN:
+                dataOutputStream.writeBoolean(field.getBoolV());
+                valueOccupation[k] += 1;
+                break;
+              case TEXT:
+                dataOutputStream.writeInt(field.getBinaryV().getLength());
+                dataOutputStream.write(field.getBinaryV().getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + field.getBinaryV().getLength();
+                break;
+              default:
+                throw new UnSupportedDataTypeException(
+                    String.format("Data type %s is not supported.", type));
+            }
+          }
+        }
+        rowCount++;
+        if (rowCount % 8 == 0) {
+          for (int j = 0; j < bitmap.length; j++) {
+            DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+            dataBitmapOutputStream.writeByte(bitmap[j]);
+            // we should clear the bitmap every 8 row record
+            bitmap[j] = 0;
+          }
+        }
+      } else {
+        break;
+      }
+    }
+
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    if (remaining != 0) {
+      for (int j = 0; j < bitmap.length; j++) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[3*j + 2];
+        dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+      }
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1);
+    List<ByteBuffer> timeList = new LinkedList<>();
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 0; i < byteArrayOutputStreams.length; i += 3) {
+      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+      timeBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      timeBuffer.flip();
+      timeList.add(timeBuffer);
+      
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[i/3]);
+      valueBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i+2].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryNonAlignDataSet.setBitmapList(bitmapList);
+    tsQueryNonAlignDataSet.setValueList(valueList);
+    return tsQueryNonAlignDataSet;
 
 Review comment:
   I mean to do it in the return phase (just before sending the result to the client), where I think there is no multithread.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on issue #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on issue #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#issuecomment-573274294
 
 
   > > And I hope you can do a performance testing between the align version and non-align version. As expected, the no-align version should improve as least 3 times.
   > 
   > Hi, do we have result now?
   
   I tried to run the test on my own computer, but the speed of writing test data was so slow. It may take up to 12 hours to write all data. I have no any idea about the reason. 
   I also tried to run the performance test on server. I can connect VPN, but I cannot log in to the server. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365074701
 
 

 ##########
 File path: jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
 ##########
 @@ -761,6 +851,70 @@ private void constructOneRow() {
     }
     rowsIndex++;
   }
+  
+  private void constructNonAlignOneRow() {
+    for (int i = 0; i < tsQueryNonAlignDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryNonAlignDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #705: [IOTDB-396] New query clause: disable align
URL: https://github.com/apache/incubator-iotdb/pull/705#discussion_r365073836
 
 

 ##########
 File path: service-rpc/src/main/thrift/rpc.thrift
 ##########
 @@ -219,12 +222,22 @@ struct ServerProperties {
 }
 
 struct TSQueryDataSet{
-   // ByteBuffer for time column
-   1: required binary time
-   // ByteBuffer for each column values
-   2: required list<binary> valueList
-   // Bitmap for each column to indicate whether it is a null value
-   3: required list<binary> bitmapList
+    // ByteBuffer for time column
+    1: required binary time
+    // ByteBuffer for each column values
+    2: required list<binary> valueList
+    // Bitmap for each column to indicate whether it is a null value
+    3: required list<binary> bitmapList
+}
+
+struct TSQueryNonAlignDataSet{
+    // ByteBuffer for each time column
+	1: required list<binary> timeList
+	// ByteBuffer for each column values
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services