You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/01/14 08:41:23 UTC

[incubator-iotdb] 01/04: s

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit be4b8af0d06f2cd58c162b3f147e9f9f0237309a
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 15:56:03 2020 +0800

    s
---
 .../main/java/org/apache/iotdb/JDBCExample.java    |  47 +----
 .../db/query/dataset/NonAlignEngineDataSet.java    | 226 ++++++++++++---------
 2 files changed, 137 insertions(+), 136 deletions(-)

diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index 99a6193..cffdd32 100644
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.iotdb;
 
 import java.sql.Connection;
@@ -31,23 +13,11 @@ public class JDBCExample {
     Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
     try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
          Statement statement = connection.createStatement()) {
-      statement.execute("SET STORAGE GROUP TO root.sg1");
-      statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE");
-      statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE");
-      statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE");
-
-      for (int i = 0; i <= 100; i++) {
-        statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")");
-      }
-      statement.executeBatch();
-      statement.clearBatch();
-
-      ResultSet resultSet = statement.executeQuery("select * from root where time <= 10");
-      outputResult(resultSet);
-      resultSet = statement.executeQuery("select count(*) from root");
-      outputResult(resultSet);
-      resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)");
+      long startTime = System.currentTimeMillis();
+      ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000");
       outputResult(resultSet);
+      long endTime = System.currentTimeMillis();
+      System.out.println("Cost Time: " + (endTime - startTime));
     }
   }
 
@@ -61,15 +31,6 @@ public class JDBCExample {
       }
       System.out.println();
       while (resultSet.next()) {
-        for (int i = 1; ; i++) {
-          System.out.print(resultSet.getString(i));
-          if (i < columnCount) {
-            System.out.print(", ");
-          } else {
-            System.out.println();
-            break;
-          }
-        }
       }
       System.out.println("--------------------------\n");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index 1474bba..c2e72e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -19,13 +19,6 @@
 
 package org.apache.iotdb.db.query.dataset;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
@@ -42,28 +35,33 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 public class NonAlignEngineDataSet extends QueryDataSet {
-  
+
   private static class ReadTask implements Runnable {
-    
+
     private final ManagedSeriesReader reader;
-    private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+    private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue;
     private WatermarkEncoder encoder;
-    private int rowOffset;
-    private int fetchSize;
-    private int rowLimit;
-    private int alreadyReturnedRowNum = 0;
-    
-    public ReadTask(ManagedSeriesReader reader, 
-        BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
-        WatermarkEncoder encoder,
-        int rowOffset, int rowLimit, int fetchSize) {
+    NonAlignEngineDataSet dataSet;
+    private int index;
+
+
+    public ReadTask(ManagedSeriesReader reader,
+                    BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue,
+                    WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.encoder = encoder;
-      this.rowOffset = rowOffset;
-      this.rowLimit = rowLimit;
-      this.fetchSize = fetchSize;
+      this.dataSet = dataSet;
+      this.index = index;
     }
 
     @Override
@@ -75,18 +73,23 @@ public class NonAlignEngineDataSet extends QueryDataSet {
           // if the task is submitted, there must be free space in the queue
           // so here we don't need to check whether the queue has free space
           // the reader has next batch
-          if (reader.hasNextBatch()) {
-            BatchData batchData = reader.nextBatch();
-            
+          if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+                  || reader.hasNextBatch()) {
+            BatchData batchData;
+            if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+              batchData = dataSet.cachedBatchData[index];
+            else
+              batchData = reader.nextBatch();
+
             int rowCount = 0;
-            while (rowCount < fetchSize) {
-              
-              if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+            while (rowCount < dataSet.fetchSize) {
+
+              if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) {
                 break;
               }
-              
+
               if (batchData != null && batchData.hasCurrent()) {
-                if (rowOffset == 0) {
+                if (dataSet.offsetArray[index] == 0) {
                   long time = batchData.currentTime();
                   ReadWriteIOUtils.write(time, timeBAOS);
                   TSDataType type = batchData.getDataType();
@@ -136,20 +139,42 @@ public class NonAlignEngineDataSet extends QueryDataSet {
                 batchData.next();
               }
               else {
-                break;
+                if (reader.hasNextBatch()) {
+                  batchData = reader.nextBatch();
+                  dataSet.cachedBatchData[index] = batchData;
+                  continue;
+                }
+                else
+                  break;
               }
-              if (rowOffset == 0) {
+              if (dataSet.offsetArray[index] == 0) {
                 rowCount++;
-                if (rowLimit > 0) {
-                  alreadyReturnedRowNum++;
+                if (dataSet.limit > 0) {
+                  dataSet.alreadyReturnedRowNumArray[index]++;
                 }
               } else {
-                rowOffset--;
+                dataSet.offsetArray[index]--;
               }
             }
-            
-            Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
-            
+            if (rowCount == 0) {
+              blockingQueue.put(new Pair(null, null));
+              // set the hasRemaining field in reader to false
+              // tell the Consumer not to submit another task for this reader any more
+              reader.setHasRemaining(false);
+              // remove itself from the QueryTaskPoolManager
+              reader.setManagedByQueryManager(false);
+              return;
+            }
+
+            ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
+            timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
+            timeBuffer.flip();
+            ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size());
+            valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size());
+            valueBuffer.flip();
+
+            Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer);
+
             blockingQueue.put(timeValueBAOSPair);
             // if the queue also has free space, just submit another itself
             if (blockingQueue.remainingCapacity() > 0) {
@@ -162,7 +187,7 @@ public class NonAlignEngineDataSet extends QueryDataSet {
             }
             return;
           }
-          blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+          blockingQueue.put(new Pair(null, null));
           // set the hasRemaining field in reader to false
           // tell the Consumer not to submit another task for this reader any more
           reader.setHasRemaining(false);
@@ -176,19 +201,37 @@ public class NonAlignEngineDataSet extends QueryDataSet {
       } catch (Exception e) {
         LOGGER.error("Something gets wrong: ", e);
       }
-      
+
     }
-    
+
   }
 
-  
+
   private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
-  
+
   // Blocking queue list for each time value buffer pair
-  private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
-  
+  private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray;
+
   private boolean initialized = false;
-  
+
+  private int[] offsetArray;
+
+  private int limit;
+
+  private int[] alreadyReturnedRowNumArray;
+
+  private BatchData[] cachedBatchData;
+
+  // indicate that there is no more batch data in the corresponding queue
+  // in case that the consumer thread is blocked on the queue and won't get runnable any more
+  // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+  // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+  // noMoreDataInQueue can still be true
+  // its usage is to tell the consumer thread not to call the take() method.
+  private boolean[] noMoreDataInQueueArray;
+
+  private int fetchSize;
+
   // indicate that there is no more batch data in the corresponding queue
   // in case that the consumer thread is blocked on the queue and won't get runnable any more
   // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
@@ -202,7 +245,7 @@ public class NonAlignEngineDataSet extends QueryDataSet {
   private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
-  
+
   /**
    * constructor of EngineDataSet.
    *
@@ -211,102 +254,99 @@ public class NonAlignEngineDataSet extends QueryDataSet {
    * @param readers readers in List(IPointReader) structure
    */
   public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers) throws InterruptedException {
+                               List<ManagedSeriesReader> readers) {
     super(paths, dataTypes);
     this.seriesReaderWithoutValueFilterList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
+    noMoreDataInQueueArray = new boolean[readers.size()];
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
       blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
     }
   }
-  
-  private void init(WatermarkEncoder encoder, int fetchSize) 
-      throws InterruptedException {
+
+  private void initLimit(int offset, int limit, int size) {
+    offsetArray = new int[size];
+    Arrays.fill(offsetArray, offset);
+    this.limit = limit;
+    alreadyReturnedRowNumArray = new int[size];
+    cachedBatchData = new BatchData[size];
+  }
+
+  private void init(WatermarkEncoder encoder, int fetchSize) {
+    initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
+    this.fetchSize = fetchSize;
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
       ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
-      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize));
+      pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i));
     }
     this.initialized = true;
   }
-  
+
   /**
    * for RPC in RawData query between client and server
    * fill time buffers and value buffers
    */
-  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+  public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException {
     if (!initialized) {
       init(encoder, fetchSize);
     }
     int seriesNum = seriesReaderWithoutValueFilterList.size();
     TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
 
-    PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
-    PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
-    
+    List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum);
+    List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum);
+
     for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+      if (!noMoreDataInQueueArray[seriesIndex]) {
+        Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take();
+        if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
+          noMoreDataInQueueArray[seriesIndex] = true;
+          timeValueByteBufferPair.left = ByteBuffer.allocate(0);
+          timeValueByteBufferPair.right = ByteBuffer.allocate(0);
+        }
+        timeBufferList.add(timeValueByteBufferPair.left);
+        valueBufferList.add(timeValueByteBufferPair.right);
+      }
+      else {
+        timeBufferList.add(ByteBuffer.allocate(0));
+        valueBufferList.add(ByteBuffer.allocate(0));
+        continue;
+      }
+
       synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
         if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
           ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
           // if the reader isn't being managed and still has more data,
           // that means this read task leave the pool before because the queue has no more space
           // now we should submit it again
-          if (reader.isManagedByQueryManager() && reader.hasRemaining()) {
+          if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
             reader.setManagedByQueryManager(true);
-            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex], 
-                encoder, rowOffset, rowLimit, fetchSize));
+            pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
+                    encoder, this, seriesIndex));
           }
         }
-        Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll();
-        if (timevalueBAOSPair != null) {
-          timeBAOSList[seriesIndex] = timevalueBAOSPair.left;
-          valueBAOSList[seriesIndex] = timevalueBAOSPair.right;
-        }
-        else {
-          timeBAOSList[seriesIndex] = new PublicBAOS();
-          valueBAOSList[seriesIndex] = new PublicBAOS();
-        }
       }
     }
 
-    List<ByteBuffer> timeBufferList = new ArrayList<>();
-    List<ByteBuffer> valueBufferList = new ArrayList<>();
-
-    for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
-      // add time buffer of current series
-      putPBOSToBuffer(timeBAOSList, timeBufferList, seriesIndex);
-
-      // add value buffer of current series
-      putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex);
-    }
-
     // set time buffers, value buffers and bitmap buffers
     tsQueryNonAlignDataSet.setTimeList(timeBufferList);
     tsQueryNonAlignDataSet.setValueList(valueBufferList);
 
     return tsQueryNonAlignDataSet;
   }
-  
-  private void putPBOSToBuffer(PublicBAOS[] aBAOSList, List<ByteBuffer> aBufferList,
-      int tsIndex) {
-    ByteBuffer aBuffer = ByteBuffer.allocate(aBAOSList[tsIndex].size());
-    aBuffer.put(aBAOSList[tsIndex].getBuf(), 0, aBAOSList[tsIndex].size());
-    aBuffer.flip();
-    aBufferList.add(aBuffer);
-  }
+
 
   @Override
-  protected boolean hasNextWithoutConstraint() throws IOException {
-    // TODO Auto-generated method stub
+  protected boolean hasNextWithoutConstraint() {
     return false;
   }
 
   @Override
-  protected RowRecord nextWithoutConstraint() throws IOException {
-    // TODO Auto-generated method stub
+  protected RowRecord nextWithoutConstraint() {
     return null;
   }
-  
+
 
 }