You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/04/29 08:20:47 UTC

[incubator-iotdb] branch SessionNext created (now 81a8166)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch SessionNext
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 81a8166  Add a jdbc-like way to fetch data in session

This branch includes the following new commits:

     new 81a8166  Add a jdbc-like way to fetch data in session

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: Add a jdbc-like way to fetch data in session

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SessionNext
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 81a8166e81899eaaf8781cb9bef248cf5bf12bd8
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Wed Apr 29 16:20:24 2020 +0800

    Add a jdbc-like way to fetch data in session
---
 .../main/java/org/apache/iotdb/SessionExample.java |  50 ++--
 .../java/org/apache/iotdb/session/Session.java     |   2 +-
 .../org/apache/iotdb/session/SessionDataSet.java   | 326 +++++++++++++++++++--
 .../iotdb/session/IoTDBSessionIteratorIT.java      | 122 ++++++++
 4 files changed, 458 insertions(+), 42 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 6aa0e25..03310d0 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -18,21 +18,21 @@
  */
 package org.apache.iotdb;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.session.SessionDataSet.DataIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.Schema;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 public class SessionExample {
 
@@ -49,7 +49,6 @@ public class SessionExample {
       if (!e.getMessage().contains("StorageGroupAlreadySetException")) {
         throw e;
       }
-//       ignore duplicated set
     }
 
     createTimeseries();
@@ -60,6 +59,7 @@ public class SessionExample {
     insertRecords();
     nonQuery();
     query();
+    queryByIterator();
     deleteData();
     deleteTimeseries();
     session.close();
@@ -95,7 +95,8 @@ public class SessionExample {
   private static void createMultiTimeseries()
       throws IoTDBConnectionException, BatchExecutionException {
 
-    if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session.checkTimeseriesExists("root.sg1.d2.s2")) {
+    if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session
+        .checkTimeseriesExists("root.sg1.d2.s2")) {
       List<String> paths = new ArrayList<>();
       paths.add("root.sg1.d2.s1");
       paths.add("root.sg1.d2.s2");
@@ -126,8 +127,9 @@ public class SessionExample {
       alias.add("weight1");
       alias.add("weight2");
 
-      session.createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList,
-          attributesList, alias);
+      session
+          .createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList,
+              attributesList, alias);
     }
   }
 
@@ -193,15 +195,11 @@ public class SessionExample {
 
   /**
    * insert the data of a device. For each timestamp, the number of measurements is the same.
-   *
+   * <p>
    * a Tablet example:
-   *
-   *      device1
-   * time s1, s2, s3
-   * 1,   1,  1,  1
-   * 2,   2,  2,  2
-   * 3,   3,  3,  3
-   *
+   * <p>
+   * device1 time s1, s2, s3 1,   1,  1,  1 2,   2,  2,  2 3,   3,  3,  3
+   * <p>
    * Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
    */
   private static void insertTablet() throws IoTDBConnectionException, BatchExecutionException {
@@ -245,7 +243,7 @@ public class SessionExample {
     Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 100);
     Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100);
     Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100);
-    
+
     Map<String, Tablet> tabletMap = new HashMap<>();
     tabletMap.put("root.sg1.d1", tablet1);
     tabletMap.put("root.sg1.d2", tablet2);
@@ -317,6 +315,22 @@ public class SessionExample {
     dataSet.closeOperationHandle();
   }
 
+  private static void queryByIterator()
+      throws IoTDBConnectionException, StatementExecutionException {
+    SessionDataSet dataSet;
+    dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+    DataIterator iterator = dataSet.iterator();
+    System.out.println(dataSet.getColumnNames());
+    dataSet.setBatchSize(1024); // default is 512
+    while (iterator.next()) {
+      System.out.println(String.format("%s,%s,%s,%s,%s", iterator.getLong(1), iterator.getLong(2),
+          iterator.getLong("root.sg1.d1.s2"), iterator.getLong(4),
+          iterator.getObject("root.sg1.d1.s4")));
+    }
+
+    dataSet.closeOperationHandle();
+  }
+
   private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
     session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 0407093..15635e5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -651,7 +651,7 @@ public class Session {
     }
 
     RpcUtils.verifySuccess(execResp.getStatus());
-    return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
+    return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), execResp.columnNameIndexMap,
         execResp.getQueryId(), client, sessionId, execResp.queryDataSet);
   }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index 763922d..fdfaf1a 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -19,11 +19,12 @@
 package org.apache.iotdb.session;
 
 import java.nio.ByteBuffer;
-import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -39,10 +40,15 @@ import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.thrift.TException;
 
 public class SessionDataSet {
 
+  private static final String TIMESTAMP_STR = "Time";
+  private static final int START_INDEX = 2;
+  private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
+
   private boolean hasCachedRecord = false;
   private String sql;
   private long queryId;
@@ -50,11 +56,9 @@ public class SessionDataSet {
   private TSIService.Iface client;
   private int batchSize = 1024;
   private List<String> columnNameList;
-  private List<String> columnTypeDeduplicatedList;
-  // duplicated column index -> origin index
-  Map<Integer, Integer> duplicateLocation;
+  protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList
   // column name -> column location
-  Map<String, Integer> columnMap;
+  Map<String, Integer> columnOrdinalMap;
   // column size
   int columnSize = 0;
 
@@ -65,8 +69,11 @@ public class SessionDataSet {
   private byte[] currentBitmap; // used to cache the current bitmap for every column
   private static final int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null
 
+  private byte[] time; // used to cache the current time value
+  private byte[][] values; // used to cache the current row record value
+
 
-  public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
+  public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, Map<String, Integer> columnNameIndex,
       long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet) {
     this.sessionId = sessionId;
     this.sql = sql;
@@ -76,22 +83,42 @@ public class SessionDataSet {
     currentBitmap = new byte[columnNameList.size()];
     columnSize = columnNameList.size();
 
-    // deduplicate columnTypeList according to columnNameList
-    this.columnTypeDeduplicatedList = new ArrayList<>();
-    // duplicated column index -> origin index
-    duplicateLocation = new HashMap<>();
-    // column name -> column location
-    columnMap = new HashMap<>();
-    for (int i = 0; i < columnNameList.size(); i++) {
-      String name = columnNameList.get(i);
-      if (columnMap.containsKey(name)) {
-        duplicateLocation.put(i, columnMap.get(name));
-      } else {
-        columnMap.put(name, i);
-        columnTypeDeduplicatedList.add(columnTypeList.get(i));
+    this.columnNameList = new ArrayList<>();
+    this.columnNameList.add(TIMESTAMP_STR);
+    // deduplicate and map
+    this.columnOrdinalMap = new HashMap<>();
+    this.columnOrdinalMap.put(TIMESTAMP_STR, 1);
+
+    // deduplicate and map
+    if (columnNameIndex != null) {
+      this.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size());
+      for (int i = 0; i < columnNameIndex.size(); i++) {
+        columnTypeDeduplicatedList.add(null);
+      }
+      for (int i = 0; i < columnNameList.size(); i++) {
+        String name = columnNameList.get(i);
+        this.columnNameList.add(name);
+        if (!columnOrdinalMap.containsKey(name)) {
+          int index = columnNameIndex.get(name);
+          columnOrdinalMap.put(name, index+START_INDEX);
+          columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i)));
+        }
+      }
+    } else {
+      this.columnTypeDeduplicatedList = new ArrayList<>();
+      int index = START_INDEX;
+      for (int i = 0; i < columnNameList.size(); i++) {
+        String name = columnNameList.get(i);
+        this.columnNameList.add(name);
+        if (!columnOrdinalMap.containsKey(name)) {
+          columnOrdinalMap.put(name, index++);
+          columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
+        }
       }
     }
 
+    time = new byte[Long.BYTES];
+    values = new byte[columnNameList.size()][];
     this.tsQueryDataSet = queryDataSet;
   }
 
@@ -142,9 +169,9 @@ public class SessionDataSet {
     int loc = 0;
     for (int i = 0; i < columnSize; i++) {
       Field field;
-
-      if (duplicateLocation.containsKey(i)) {
-        field = Field.copy(outFields.get(duplicateLocation.get(i)));
+      int deduplicatedIndex = columnOrdinalMap.get(columnNameList.get(i+1)) - START_INDEX;
+      if (deduplicatedIndex < i) {
+        field = Field.copy(outFields.get(deduplicatedIndex));
       } else {
         ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(loc);
         // another new 8 row, should move the bitmap buffer position to next byte
@@ -154,7 +181,7 @@ public class SessionDataSet {
 
         if (!isNull(loc, rowsIndex)) {
           ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(loc);
-          TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(loc));
+          TSDataType dataType = columnTypeDeduplicatedList.get(loc);
           field = new Field(dataType);
           switch (dataType) {
             case BOOLEAN:
@@ -232,4 +259,257 @@ public class SessionDataSet {
           "Error occurs when connecting to server for close operation, because: " + e, e);
     }
   }
+
+  public DataIterator iterator() {
+    return new DataIterator();
+  }
+
+  public class DataIterator {
+
+    private boolean emptyResultSet = false;
+
+    public boolean next() throws StatementExecutionException {
+      if (hasCachedResults()) {
+        constructOneRow();
+        return true;
+      }
+      if (emptyResultSet) {
+        return false;
+      }
+      if (fetchResults()) {
+        constructOneRow();
+        return true;
+      }
+      return false;
+    }
+
+    private boolean fetchResults() throws StatementExecutionException {
+      rowsIndex = 0;
+      TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, batchSize, queryId, true);
+      try {
+        TSFetchResultsResp resp = client.fetchResults(req);
+
+        RpcUtils.verifySuccess(resp.getStatus());
+        if (!resp.hasResultSet) {
+          emptyResultSet = true;
+        } else {
+          tsQueryDataSet = resp.getQueryDataSet();
+        }
+        return resp.hasResultSet;
+      } catch (TException e) {
+        throw new StatementExecutionException(
+            "Cannot fetch result from server, because of network connection: {} ", e);
+      }
+    }
+
+    private void constructOneRow() {
+      tsQueryDataSet.time.get(time);
+      for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
+        ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
+        // another new 8 row, should move the bitmap buffer position to next byte
+        if (rowsIndex % 8 == 0) {
+          currentBitmap[i] = bitmapBuffer.get();
+        }
+        values[i] = null;
+        if (!isNull(i, rowsIndex)) {
+          ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
+          TSDataType dataType = columnTypeDeduplicatedList.get(i);
+          switch (dataType) {
+            case BOOLEAN:
+              if (values[i] == null) {
+                values[i] = new byte[1];
+              }
+              valueBuffer.get(values[i]);
+              break;
+            case INT32:
+              if (values[i] == null) {
+                values[i] = new byte[Integer.BYTES];
+              }
+              valueBuffer.get(values[i]);
+              break;
+            case INT64:
+              if (values[i] == null) {
+                values[i] = new byte[Long.BYTES];
+              }
+              valueBuffer.get(values[i]);
+              break;
+            case FLOAT:
+              if (values[i] == null) {
+                values[i] = new byte[Float.BYTES];
+              }
+              valueBuffer.get(values[i]);
+              break;
+            case DOUBLE:
+              if (values[i] == null) {
+                values[i] = new byte[Double.BYTES];
+              }
+              valueBuffer.get(values[i]);
+              break;
+            case TEXT:
+              int length = valueBuffer.getInt();
+              values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
+          }
+        }
+      }
+      rowsIndex++;
+    }
+
+    private boolean hasCachedResults() {
+      return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining());
+    }
+
+    public boolean getBoolean(int columnIndex) throws StatementExecutionException {
+      return getBoolean(findColumnNameByIndex(columnIndex));
+    }
+
+    public boolean getBoolean(String columnName) throws StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToBool(values[index]);
+      }
+      else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName));
+      }
+    }
+
+    public double getDouble(int columnIndex) throws StatementExecutionException {
+      return getDouble(findColumnNameByIndex(columnIndex));
+    }
+
+    public double getDouble(String columnName) throws StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToDouble(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName));
+      }
+    }
+
+    public float getFloat(int columnIndex) throws StatementExecutionException {
+      return getFloat(findColumnNameByIndex(columnIndex));
+    }
+
+    public float getFloat(String columnName) throws StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToFloat(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName));
+      }
+    }
+
+    public int getInt(int columnIndex) throws StatementExecutionException {
+      return getInt(findColumnNameByIndex(columnIndex));
+    }
+
+    public int getInt(String columnName) throws StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToInt(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName));
+      }
+    }
+
+    public long getLong(int columnIndex) throws StatementExecutionException {
+      return getLong(findColumnNameByIndex(columnIndex));
+    }
+
+    public long getLong(String columnName) throws StatementExecutionException {
+      checkRecord();
+      if (columnName.equals(TIMESTAMP_STR)) {
+        return BytesUtils.bytesToLong(time);
+      }
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToLong(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName));
+      }
+    }
+
+    public Object getObject(int columnIndex) throws StatementExecutionException {
+      return getObject(findColumnNameByIndex(columnIndex));
+    }
+
+    public Object getObject(String columnName) throws StatementExecutionException {
+      return getValueByName(columnName);
+    }
+
+    public String getString(int columnIndex) throws StatementExecutionException {
+      return getString(findColumnNameByIndex(columnIndex));
+    }
+
+    public String getString(String columnName) throws StatementExecutionException {
+      return getValueByName(columnName);
+    }
+
+    public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException {
+      return new Timestamp(getLong(columnIndex));
+    }
+
+    public Timestamp getTimestamp(String columnName) throws StatementExecutionException {
+      return getTimestamp(findColumn(columnName));
+    }
+
+    public int findColumn(String columnName) {
+      return columnOrdinalMap.get(columnName);
+    }
+
+    private String getValueByName(String columnName) throws StatementExecutionException {
+      checkRecord();
+      if (columnName.equals(TIMESTAMP_STR)) {
+        return String.valueOf(BytesUtils.bytesToLong(time));
+      }
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (index < 0 || index >= values.length || values[index] == null) {
+        return null;
+      }
+      return getString(index, columnTypeDeduplicatedList.get(index), values);
+    }
+
+    protected String getString(int index, TSDataType tsDataType, byte[][] values) {
+      switch (tsDataType) {
+        case BOOLEAN:
+          return String.valueOf(BytesUtils.bytesToBool(values[index]));
+        case INT32:
+          return String.valueOf(BytesUtils.bytesToInt(values[index]));
+        case INT64:
+          return String.valueOf(BytesUtils.bytesToLong(values[index]));
+        case FLOAT:
+          return String.valueOf(BytesUtils.bytesToFloat(values[index]));
+        case DOUBLE:
+          return String.valueOf(BytesUtils.bytesToDouble(values[index]));
+        case TEXT:
+          return new String(values[index]);
+        default:
+          return null;
+      }
+    }
+
+    private void checkRecord() throws StatementExecutionException {
+      if (Objects.isNull(tsQueryDataSet)) {
+        throw new StatementExecutionException("No record remains");
+      }
+    }
+  }
+
+  private String findColumnNameByIndex(int columnIndex) throws StatementExecutionException {
+    if (columnIndex <= 0) {
+      throw new StatementExecutionException("column index should start from 1");
+    }
+    if (columnIndex > columnNameList.size()) {
+      throw new StatementExecutionException(
+          String.format("column index %d out of range %d", columnIndex, columnNameList.size()));
+    }
+    return columnNameList.get(columnIndex - 1);
+  }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
new file mode 100644
index 0000000..8d6ec3e
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.SessionDataSet.DataIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBSessionIteratorIT {
+
+  private Session session;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    session.close();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() {
+    String[] retArray = new String[]{
+        "0,1,2.0,null",
+        "1,1,2.0,null",
+        "2,1,2.0,null",
+        "3,1,2.0,null",
+        "4,1,2.0,null",
+        "5,1,2.0,4.0",
+        "6,1,2.0,4.0",
+        "7,1,2.0,4.0",
+        "8,1,2.0,4.0",
+        "9,1,2.0,4.0",
+    };
+
+    try {
+      SessionDataSet sessionDataSet = session.executeQueryStatement("select * from root.sg1");
+      sessionDataSet.setBatchSize(1024);
+      DataIterator iterator = sessionDataSet.iterator();
+      int count = 0;
+      while (iterator.next()) {
+        String ans = String.format("%s,%s,%s,%s", iterator.getLong(1), iterator.getInt("root.sg1.d1.s1"),
+            iterator.getFloat(3), iterator.getString("root.sg1.d2.s1"));
+        assertEquals(retArray[count], ans);
+        count++;
+      }
+      assertEquals(retArray.length, count);
+      sessionDataSet.closeOperationHandle();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  private void prepareData() throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+
+    session.setStorageGroup("root.sg1");
+    session.createTimeseries("root.sg1.d1.s1", TSDataType.INT32, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d1.s2", TSDataType.FLOAT, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d2.s1", TSDataType.DOUBLE, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    String deviceId = "root.sg1.d1";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    for (long time = 0; time < 10; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("1");
+      values.add("2");
+      session.insertRecord(deviceId, time, measurements, values);
+    }
+
+    deviceId = "root.sg1.d2";
+    measurements = new ArrayList<>();
+    measurements.add("s1");
+    for (long time = 5; time < 10; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("4");
+      session.insertRecord(deviceId, time, measurements, values);
+    }
+  }
+
+}