You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ge...@apache.org on 2020/11/02 07:54:07 UTC

[iotdb] 01/04: change jdbc to csv

This is an automated email from the ASF dual-hosted git repository.

geniuspig pushed a commit to branch change_jdbc_to_session_in_csv
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a6c562d05efb115559000c41ad6bed7bd15e6458
Author: Boris <96...@qq.com>
AuthorDate: Sun Nov 1 23:11:34 2020 +0800

    change jdbc to csv
---
 cli/pom.xml                                        |   2 +-
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |  18 +-
 .../main/java/org/apache/iotdb/tool/ExportCsv.java | 157 ++++----
 .../main/java/org/apache/iotdb/tool/ImportCsv.java | 393 +++++----------------
 4 files changed, 164 insertions(+), 406 deletions(-)

diff --git a/cli/pom.xml b/cli/pom.xml
index 4779c65..656ac34 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -37,7 +37,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-jdbc</artifactId>
+            <artifactId>iotdb-session</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git a/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java b/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java
index d7e3074..0317ab7 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java
@@ -23,9 +23,9 @@ import java.time.ZoneId;
 import jline.console.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
 import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.jdbc.IoTDBConnection;
-import org.apache.iotdb.jdbc.IoTDBSQLException;
-import org.apache.thrift.TException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
 
 public abstract class AbstractCsvTool {
 
@@ -69,7 +69,7 @@ public abstract class AbstractCsvTool {
 
   protected static String timeZoneID;
   protected static String timeFormat;
-  protected static IoTDBConnection connection;
+  protected static Session session;
 
   AbstractCsvTool() {}
   
@@ -85,11 +85,11 @@ public abstract class AbstractCsvTool {
     return str;
   }
 
-  protected static void setTimeZone() throws IoTDBSQLException, TException {
+  protected static void setTimeZone() throws IoTDBConnectionException, StatementExecutionException {
     if (timeZoneID != null) {
-      connection.setTimeZone(timeZoneID);
+      session.setTimeZone(timeZoneID);
     }
-    zoneId = ZoneId.of(connection.getTimeZone());
+    zoneId = ZoneId.of(session.getTimeZone());
   }
 
   protected static void parseBasicParams(CommandLine commandLine, ConsoleReader reader)
@@ -110,8 +110,8 @@ public abstract class AbstractCsvTool {
         return true;
       }
     }
-    System.out.println(String.format("Input time format %s is not supported, "
-        + "please input like yyyy-MM-dd\\ HH:mm:ss.SSS or yyyy-MM-dd'T'HH:mm:ss.SSS", timeFormat));
+    System.out.printf("Input time format %s is not supported, "
+        + "please input like yyyy-MM-dd\\ HH:mm:ss.SSS or yyyy-MM-dd'T'HH:mm:ss.SSS%n", timeFormat);
     return false;
   }
 }
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java
index 7a33eb4..f30ddbc 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java
@@ -25,16 +25,9 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
 import java.time.Instant;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
 import java.util.List;
 import jline.console.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
@@ -46,9 +39,12 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.iotdb.cli.AbstractCli;
 import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.jdbc.IoTDBConnection;
-import org.apache.thrift.TException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 
 /**
  * Export CSV file.
@@ -75,14 +71,10 @@ public class ExportCsv extends AbstractCsvTool {
 
   private static final int EXPORT_PER_LINE_COUNT = 10000;
 
-  private static String TIMESTAMP_PRECISION = "ms";
-
-  private static List<Integer> typeList = new ArrayList<>();
-
   /**
    * main function of export csv tool.
    */
-  public static void main(String[] args) throws IOException, SQLException {
+  public static void main(String[] args) throws IOException {
     Options options = createOptions();
     HelpFormatter hf = new HelpFormatter();
     hf.setOptionComparator(null); // avoid reordering
@@ -116,14 +108,15 @@ public class ExportCsv extends AbstractCsvTool {
       if (!checkTimeFormat()) {
         return;
       }
-      Class.forName(Config.JDBC_DRIVER_NAME);
 
       String sqlFile = commandLine.getOptionValue(SQL_FILE_ARGS);
       String sql;
 
-      connection = (IoTDBConnection) DriverManager
-          .getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", username, password);
-      setTimeZone();
+      try {
+        setTimeZone();
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        e.printStackTrace();
+      }
 
       if (sqlFile == null) {
         sql = reader.readLine(TSFILEDB_CLI_PREFIX + "> please input query: ");
@@ -134,21 +127,18 @@ public class ExportCsv extends AbstractCsvTool {
       } else {
         dumpFromSqlFile(sqlFile);
       }
-    } catch (ClassNotFoundException e) {
-      System.out.println("Failed to export data because cannot find IoTDB JDBC Driver, "
-          + "please check whether you have imported driver or not: " + e.getMessage());
-    } catch (TException e) {
-      System.out.println("Encounter an error when connecting to server, because " + e.getMessage());
-    } catch (SQLException e) {
-      System.out.println("Encounter an error when exporting data, error is: " + e.getMessage());
     } catch (IOException e) {
       System.out.println("Failed to operate on file, because " + e.getMessage());
     } catch (ArgsErrorException e) {
       System.out.println("Invalid args: " + e.getMessage());
     } finally {
       reader.close();
-      if (connection != null) {
-        connection.close();
+      if (session != null) {
+        try {
+          session.close();
+        } catch (IoTDBConnectionException e) {
+          System.out.println("Encounter an error when closing session, error is: " + e.getMessage());
+        }
       }
     }
   }
@@ -234,12 +224,7 @@ public class ExportCsv extends AbstractCsvTool {
       String sql;
       int index = 0;
       while ((sql = reader.readLine()) != null) {
-        try {
-          dumpResult(sql, index);
-        } catch (SQLException e) {
-          System.out
-              .println("Cannot dump data for statement " + sql + ", because : " + e.getMessage());
-        }
+        dumpResult(sql, index);
         index++;
       }
     }
@@ -250,10 +235,8 @@ public class ExportCsv extends AbstractCsvTool {
    *
    * @param sql export the result of executing the sql
    * @param index use to create dump file name
-   * @throws SQLException if SQL is not valid
    */
-  private static void dumpResult(String sql, int index)
-      throws SQLException {
+  private static void dumpResult(String sql, int index) {
 
     final String path = targetDirectory + targetFile + index + ".csv";
     File tf = new File(path);
@@ -267,78 +250,66 @@ public class ExportCsv extends AbstractCsvTool {
       return;
     }
     System.out.println("Start to export data from sql statement: " + sql);
-    try (Statement statement = connection.createStatement();
-        ResultSet rs = statement.executeQuery(sql);
-        BufferedWriter bw = new BufferedWriter(new FileWriter(tf))) {
-      ResultSetMetaData metadata = rs.getMetaData();
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(tf))) {
+      SessionDataSet sessionDataSet = session.executeQueryStatement(sql);
       long startTime = System.currentTimeMillis();
-
-      int count = metadata.getColumnCount();
       // write data in csv file
-      writeMetadata(bw, count, metadata);
+      writeMetadata(bw, sessionDataSet.getColumnNames());
+
 
-      int line = writeResultSet(rs, bw, count);
+      int line = writeResultSet(sessionDataSet, bw);
       System.out
-          .println(String.format("Statement [%s] has dumped to file %s successfully! It costs "
-                  + "%dms to export %d lines.", sql, path, System.currentTimeMillis() - startTime,
-              line));
-    } catch (IOException e) {
+          .printf("Statement [%s] has dumped to file %s successfully! It costs "
+                  + "%dms to export %d lines.%n", sql, path, System.currentTimeMillis() - startTime,
+              line);
+    } catch (IOException | StatementExecutionException | IoTDBConnectionException e) {
       System.out.println("Cannot dump result because: " + e.getMessage());
     }
   }
 
-  private static void writeMetadata(BufferedWriter bw, int count, ResultSetMetaData metadata)
-      throws SQLException, IOException {
-    for (int i = 1; i <= count; i++) {
-      if (i < count) {
-        bw.write(metadata.getColumnLabel(i) + ",");
-      } else {
-        bw.write(metadata.getColumnLabel(i) + "\n");
-      }
-      typeList.add(metadata.getColumnType(i));
+  private static void writeMetadata(BufferedWriter bw, List<String> columnNames) throws IOException {
+    for (int i = 0; i < columnNames.size() - 1; i++) {
+      bw.write(columnNames.get(i) + ",");
     }
+    bw.write(columnNames.get(columnNames.size() - 1) + "\n");
   }
 
-  private static int writeResultSet(ResultSet rs, BufferedWriter bw, int count)
-      throws SQLException, IOException {
+  private static int writeResultSet(SessionDataSet rs, BufferedWriter bw)
+      throws IOException, StatementExecutionException, IoTDBConnectionException {
     int line = 0;
     long timestamp = System.currentTimeMillis();
-    while (rs.next()) {
-      if (rs.getString(1) == null ||
-          "null".equalsIgnoreCase(rs.getString(1))) {
-        bw.write(",");
-      } else {
-        writeTime(rs, bw);
-        writeValue(rs, count, bw);
-      }
+    while (rs.hasNext()) {
+      RowRecord rowRecord = rs.next();
+      List<Field> fields = rowRecord.getFields();
+      writeTime(rowRecord.getTimestamp(), bw);
+      writeValue(fields, bw);
       line++;
       if (line % EXPORT_PER_LINE_COUNT == 0) {
         long tmp = System.currentTimeMillis();
-        System.out.println(
-            String.format("%d lines have been exported, it takes %dms", line, (tmp - timestamp)));
+        System.out.printf("%d lines have been exported, it takes %dms%n", line, (tmp - timestamp));
         timestamp = tmp;
       }
     }
     return line;
   }
 
-  private static void writeTime(ResultSet rs, BufferedWriter bw) throws SQLException, IOException {
+  private static void writeTime(Long time, BufferedWriter bw) throws IOException {
     ZonedDateTime dateTime;
+    String TIMESTAMP_PRECISION = "ms";
     switch (timeFormat) {
       case "default":
-        long timestamp = rs.getLong(1);
         String str = AbstractCli
-            .parseLongToDateWithPrecision(DateTimeFormatter.ISO_OFFSET_DATE_TIME, timestamp, zoneId,
+            .parseLongToDateWithPrecision(DateTimeFormatter.ISO_OFFSET_DATE_TIME, time, zoneId,
                 TIMESTAMP_PRECISION);
         bw.write(str + ",");
         break;
       case "timestamp":
       case "long":
       case "nubmer":
-        bw.write(rs.getLong(1) + ",");
+        bw.write(time + ",");
         break;
       default:
-        dateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(rs.getLong(1)),
+        dateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(time),
             zoneId);
         bw.write(dateTime.format(DateTimeFormatter.ofPattern(timeFormat)) + ",");
         break;
@@ -346,30 +317,26 @@ public class ExportCsv extends AbstractCsvTool {
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private static void writeValue(ResultSet rs, int count, BufferedWriter bw)
-      throws SQLException, IOException {
-    for (int j = 2; j <= count; j++) {
-      if (j < count) {
-        if ("null".equals(rs.getString(j))) {
-          bw.write(",");
-        } else {
-          if(typeList.get(j-1) == Types.VARCHAR) {
-            bw.write("\'" + rs.getString(j) + "\'"+ ",");
-          } else {
-            bw.write(rs.getString(j) + ",");
-          }
-        }
+  private static void writeValue(List<Field> fields, BufferedWriter bw) throws IOException {
+    for (int j = 0; j < fields.size() - 1; j++) {
+      if ("null".equals(fields.get(j).getStringValue())) {
+        bw.write(",");
       } else {
-        if ("null".equals(rs.getString(j))) {
-          bw.write("\n");
+        if(fields.get(j).getDataType() == TSDataType.TEXT) {
+          bw.write("'" + fields.get(j).getBinaryV().toString() + "'" + ",");
         } else {
-          if(typeList.get(j-1) == Types.VARCHAR) {
-            bw.write("\'" + rs.getString(j) + "\'"+ "\n");
-          } else {
-            bw.write(rs.getString(j) + "\n");
-          }
+          bw.write(fields.get(j).getStringValue() + ",");
         }
       }
     }
+    if ("null".equals(fields.get(fields.size() - 1).getStringValue())) {
+      bw.write("\n");
+    } else {
+      if(fields.get(fields.size() - 1).getDataType() == TSDataType.TEXT) {
+        bw.write("'" + fields.get(fields.size() - 1).getBinaryV().toString() + "'" + "\n");
+      } else {
+        bw.write(fields.get(fields.size() - 1).getStringValue() + "\n");
+      }
+    }
   }
 }
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index ee83572..e6be89a 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -19,20 +19,14 @@
 package org.apache.iotdb.tool;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.LineNumberReader;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import jline.console.ConsoleReader;
@@ -44,12 +38,11 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.jdbc.IoTDBConnection;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
 /**
  * read a CSV formatted data File and insert all the data into IoTDB.
@@ -60,18 +53,9 @@ public class ImportCsv extends AbstractCsvTool {
   private static final String FILE_SUFFIX = "csv";
 
   private static final String TSFILEDB_CLI_PREFIX = "ImportCsv";
-  private static final String ERROR_INFO_STR = "csvInsertError.error";
 
-  private static final String STRING_DATA_TYPE = "TEXT";
-  private static final int BATCH_EXECUTE_COUNT = 100;
-
-  private static String errorInsertInfo = "";
-  private static boolean errorFlag;
-
-  private static String IOTDB_CLI_HOME = "IOTDB_CLI_HOME";
-
-  private static int count;
-  private static Statement statement;
+  private static Session session;
+  private static final String ILLEGAL_PATH_ARGUMENT = "Path parameter is null";
 
   /**
    * create the commandline options.
@@ -119,8 +103,8 @@ public class ImportCsv extends AbstractCsvTool {
   /**
    * Data from csv To tsfile.
    */
-  private static void loadDataFromCSV(File file, int index) {
-    statement = null;
+  private static void loadDataFromCSV(File file) {
+    session = null;
     int fileLine;
     try {
       fileLine = getFileLineCount(file);
@@ -128,287 +112,67 @@ public class ImportCsv extends AbstractCsvTool {
       System.out.println("Failed to import file: " + file.getName());
       return;
     }
-    File errorFile = new File(errorInsertInfo + index);
-    if (!errorFile.exists()) {
-      try {
-        errorFile.createNewFile();
-      } catch (IOException e) {
-        System.out.println("Cannot create a errorFile because: " + e.getMessage());
-        return;
-      }
-    }
     System.out.println("Start to import data from: " + file.getName());
-    errorFlag = true;
     try(BufferedReader br = new BufferedReader(new FileReader(file));
-        BufferedWriter bw = new BufferedWriter(new FileWriter(errorFile));
         ProgressBar pb = new ProgressBar("Import from: " + file.getName(), fileLine)) {
       pb.setExtraMessage("Importing...");
       String header = br.readLine();
-
-      bw.write("From " + file.getAbsolutePath());
-      bw.newLine();
-      bw.newLine();
-      bw.write(header);
-      bw.newLine();
-      bw.newLine();
-
-      // storage csv table head info
-      Map<String, ArrayList<Integer>> deviceToColumn = new HashMap<>();
-      // storage csv table head info
-      List<String> colInfo = new ArrayList<>();
-      // storage csv device sensor info, corresponding csv table head
-      List<String> headInfo = new ArrayList<>();
-
       String[] strHeadInfo = header.split(",");
       if (strHeadInfo.length <= 1) {
         System.out.println("The CSV file "+ file.getName() +" illegal, please check first line");
         return;
       }
 
-      long startTime = System.currentTimeMillis();
-      Map<String, String> timeseriesDataType = new HashMap<>();
+      List<String> devices = new ArrayList<>();
+      List<Long> times = new ArrayList<>();
+      List<List<String>> measurements = new ArrayList<>();
+      List<List<String>> valuesList = new ArrayList<>();
+      Map<String, List<String>> deviceToMeasurements = new HashMap<>();
 
-      boolean success = queryDatabaseMeta(strHeadInfo, file, bw, timeseriesDataType, headInfo,
-          deviceToColumn, colInfo);
-      if (!success) {
-        errorFlag = false;
-        return;
+      String[] cols = header.split(",");
+
+      for(int i = 1; i < cols.length; i++) {
+        splitColToDeviceAndMeasurement(cols[i], deviceToMeasurements);
       }
 
-      statement = connection.createStatement();
+      String line;
+      while((line = br.readLine()) != null) {
+        cols = line.split(",");
+        times.add(Long.parseLong(cols[0]));
+        List<String> values = new ArrayList<>(Arrays.asList(cols).subList(1, cols.length));
+        valuesList.add(values);
+      }
 
+      for(String device: deviceToMeasurements.keySet()) {
+        devices.add(device);
+        measurements.add(deviceToMeasurements.get(device));
+      }
 
-      List<String> tmp = new ArrayList<>();
-      success = readAndGenSqls(br, timeseriesDataType, deviceToColumn, colInfo, headInfo,
-          bw, tmp, pb);
-      if (!success) {
+      try {
+        session.insertRecords(devices, times, measurements, valuesList);
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        System.out.println("Meet error when insert csv because " + e.getMessage());
         return;
       }
-
-      executeSqls(bw, tmp, startTime, file);
+      System.out.println("Insert csv successfully!");
       pb.stepTo(fileLine);
     } catch (FileNotFoundException e) {
       System.out.println("Cannot find " + file.getName() + " because: "+e.getMessage());
     } catch (IOException e) {
       System.out.println("CSV file read exception because: " + e.getMessage());
-    } catch (SQLException e) {
-      System.out.println("Database connection exception because: " + e.getMessage());
     } finally {
       try {
-        if (statement != null) {
-          statement.close();
+        if (session != null) {
+          session.close();
         }
-        if (errorFlag) {
-          FileUtils.forceDelete(errorFile);
-        } else {
-          System.out.println("Format of some lines in "+ file.getAbsolutePath() + " error, please "
-              + "check "+errorFile.getAbsolutePath()+" for more information");
-        }
-      } catch (SQLException e) {
+      } catch (IoTDBConnectionException e) {
         System.out.println("Sql statement can not be closed because: " + e.getMessage());
-      } catch (IOException e) {
-        System.out.println("Close file error because: " + e.getMessage());
-      }
-    }
-  }
-
-  private static void executeSqls(BufferedWriter bw, List<String> tmp, long startTime, File file)
-      throws IOException {
-    try {
-      int[] result = statement.executeBatch();
-      for (int i = 0; i < result.length; i++) {
-        if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode() && i < tmp.size()) {
-          bw.write(tmp.get(i));
-          bw.newLine();
-          errorFlag = false;
-        }
-      }
-      statement.clearBatch();
-      tmp.clear();
-    } catch (SQLException e) {
-      bw.write(e.getMessage());
-      bw.newLine();
-      errorFlag = false;
-      System.out.println("Cannot execute sql because: " + e.getMessage());
-    }
-  }
-
-  private static boolean readAndGenSqls(BufferedReader br, Map<String, String> timeseriesDataType,
-      Map<String, ArrayList<Integer>> deviceToColumn, List<String> colInfo,
-      List<String> headInfo, BufferedWriter bw, List<String> tmp, ProgressBar pb) throws IOException {
-    String line;
-    count = 0;
-    while ((line = br.readLine()) != null) {
-      List<String> sqls;
-      try {
-        sqls = createInsertSQL(line, timeseriesDataType, deviceToColumn, colInfo, headInfo);
-      } catch (Exception e) {
-        bw.write(String.format("error input line, maybe it is not complete: %s", line));
-        bw.newLine();
-        System.out.println("Cannot create sql for " + line + " because: " + e.getMessage());
-        errorFlag = false;
-        return false;
-      }
-      boolean success = addSqlsToBatch(sqls, tmp, bw);
-      pb.step();
-      if (!success) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static boolean addSqlsToBatch(List<String> sqls, List<String> tmp, BufferedWriter bw)
-      throws IOException {
-    for (String str : sqls) {
-      try {
-        count++;
-        statement.addBatch(str);
-        tmp.add(str);
-        checkBatchSize(bw, tmp);
-
-      } catch (SQLException e) {
-        bw.write(e.getMessage());
-        bw.newLine();
-        errorFlag = false;
-        System.out.println("Cannot execute sql because: " + e.getMessage());
-        return false;
-      }
-    }
-    return true;
-  }
-
-
-  private static void checkBatchSize(BufferedWriter bw, List<String> tmp)
-      throws SQLException, IOException {
-    if (count == BATCH_EXECUTE_COUNT) {
-      int[] result = statement.executeBatch();
-      for (int i = 0; i < result.length; i++) {
-        if (result[i] != Statement.SUCCESS_NO_INFO && i < tmp.size()) {
-          bw.write(tmp.get(i));
-          bw.newLine();
-          errorFlag = false;
-        }
-      }
-      statement.clearBatch();
-      count = 0;
-      tmp.clear();
-    }
-  }
-
-  private static boolean queryDatabaseMeta(String[] strHeadInfo, File file, BufferedWriter bw,
-      Map<String, String> timeseriesDataType, List<String> headInfo,
-      Map<String, ArrayList<Integer>> deviceToColumn,
-      List<String> colInfo)
-      throws SQLException, IOException {
-    try (Statement statement = connection.createStatement()) {
-
-      for (int i = 1; i < strHeadInfo.length; i++) {
-        statement.execute("show timeseries " + strHeadInfo[i]);
-        ResultSet resultSet = statement.getResultSet();
-        try {
-          if (resultSet.next()) {
-            /*
-             * now the resultSet is like following, so the index of dataType is 4
-             * +--------------+-----+-------------+--------+--------+-----------+
-               |    timeseries|alias|storage group|dataType|encoding|compression|
-               +--------------+-----+-------------+--------+--------+-----------+
-               |root.fit.d1.s1| null|  root.fit.d1|   INT32|     RLE|     SNAPPY|
-               |root.fit.d1.s2| null|  root.fit.d1|    TEXT|   PLAIN|     SNAPPY|
-               +--------------+-----+-------------+--------+--------+-----------+
-             */
-            timeseriesDataType.put(strHeadInfo[i], resultSet.getString(4));
-          } else {
-            String errorInfo = String.format("Database cannot find %s in %s, stop import!",
-                    strHeadInfo[i], file.getAbsolutePath());
-            System.out.println("Database cannot find " + strHeadInfo[i] + " in " + file.getAbsolutePath() + ", "
-                    + "stop import!");
-            bw.write(errorInfo);
-            return false;
-          }
-        } finally {
-          resultSet.close();
-        }
-        headInfo.add(strHeadInfo[i]);
-        String deviceInfo = strHeadInfo[i].substring(0, strHeadInfo[i].lastIndexOf('.'));
-
-        if (!deviceToColumn.containsKey(deviceInfo)) {
-          deviceToColumn.put(deviceInfo, new ArrayList<>());
-        }
-        // storage every device's sensor index info
-        deviceToColumn.get(deviceInfo).add(i - 1);
-        colInfo.add(strHeadInfo[i].substring(strHeadInfo[i].lastIndexOf('.') + 1));
       }
     }
-    return true;
   }
 
-  private static List<String> createInsertSQL(String line, Map<String, String> timeseriesToType,
-      Map<String, ArrayList<Integer>> deviceToColumn,
-      List<String> colInfo, List<String> headInfo) {
-    String[] data = line.split(",", headInfo.size() + 1);
-    List<String> sqls = new ArrayList<>();
-    Iterator<Map.Entry<String, ArrayList<Integer>>> it = deviceToColumn.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<String, ArrayList<Integer>> entry = it.next();
-      String sql = createOneSql(entry, data, colInfo, timeseriesToType, headInfo);
-      if (sql != null) {
-        sqls.add(sql);
-      }
-    }
-    return sqls;
-  }
-
-  private static String createOneSql(Map.Entry<String, ArrayList<Integer>> entry, String[] data,
-      List<String> colInfo, Map<String, String> timeseriesToType, List<String> headInfo) {
-    StringBuilder sbd = new StringBuilder();
-    ArrayList<Integer> colIndex = entry.getValue();
-    sbd.append("insert into ").append(entry.getKey()).append("(timestamp");
-    int skipCount = 0;
-    for (int j = 0; j < colIndex.size(); ++j) {
-      if ("".equals(data[entry.getValue().get(j) + 1])) {
-        skipCount++;
-        continue;
-      }
-      sbd.append(", ").append(colInfo.get(colIndex.get(j)));
-    }
-    // define every device null value' number, if the number equal the
-    // sensor number, the insert operation stop
-    if (skipCount == entry.getValue().size()) {
-      return null;
-    }
-
-    // TODO when timestampsStr is empty
-    String timestampsStr = data[0];
-    sbd.append(") values(").append(timestampsStr.trim().isEmpty()
-        ? "NO TIMESTAMP" : timestampsStr);
-
-    for (int j = 0; j < colIndex.size(); ++j) {
-      if ("".equals(data[entry.getValue().get(j) + 1])) {
-        continue;
-      }
-      if (timeseriesToType.get(headInfo.get(colIndex.get(j))).equals(STRING_DATA_TYPE)) {
-        /**
-         * like csv line 1,100,'hello',200,300,400, we will read the third field as 'hello',
-         * so, if we add '', the field will be ''hello'', and IoTDB will be failed
-         * to insert the field.
-         * Now, if we meet the string which is enclosed in quotation marks, we should not add ''
-         */
-        if ((data[colIndex.get(j) + 1].startsWith("'") && data[colIndex.get(j) + 1].endsWith("'")) ||
-                (data[colIndex.get(j) + 1].startsWith("\"") && data[colIndex.get(j) + 1].endsWith("\""))) {
-          sbd.append(",").append(data[colIndex.get(j) + 1]);
-        } else {
-          sbd.append(", \'").append(data[colIndex.get(j) + 1]).append("\'");
-        }
-      } else {
-        sbd.append(",").append(data[colIndex.get(j) + 1]);
-      }
-    }
-    sbd.append(")");
-    return sbd.toString();
-  }
 
-  public static void main(String[] args) throws IOException, SQLException {
+  public static void main(String[] args) throws IOException {
     Options options = createOptions();
     HelpFormatter hf = new HelpFormatter();
     hf.setOptionComparator(null);
@@ -459,18 +223,10 @@ public class ImportCsv extends AbstractCsvTool {
 
   public static void importCsvFromFile(String ip, String port, String username,
       String password, String filename,
-      String timeZone) throws SQLException {
-    String property = System.getProperty(IOTDB_CLI_HOME);
-    if (property == null) {
-      errorInsertInfo = ERROR_INFO_STR;
-    } else {
-      errorInsertInfo = property + File.separatorChar + ERROR_INFO_STR;
-    }
+      String timeZone){
     try {
-      Class.forName(Config.JDBC_DRIVER_NAME);
-      connection = (IoTDBConnection) DriverManager.getConnection(Config.IOTDB_URL_PREFIX
-              + ip + ":" + port + "/",
-          username, password);
+      session = new Session(ip, Integer.parseInt(port), username, password);
+      session.open(true);
       timeZoneID = timeZone;
       setTimeZone();
 
@@ -480,33 +236,30 @@ public class ImportCsv extends AbstractCsvTool {
       } else if (file.isDirectory()) {
         importFromDirectory(file);
       }
-
-    } catch (ClassNotFoundException e) {
-      System.out.println("Failed to import data because cannot find IoTDB JDBC Driver, "
-          + "please check whether you have imported driver or not: " + e.getMessage());
-    } catch (TException e) {
+    } catch (IoTDBConnectionException e) {
       System.out.println("Encounter an error when connecting to server, because " + e.getMessage());
-    } catch (SQLException e){
-      System.out.println("Encounter an error when importing data, error is: " + e.getMessage());
-    } catch (Exception e) {
-      System.out.println("Encounter an error, because: " + e.getMessage());
+    } catch (StatementExecutionException e) {
+      System.out.println("Encounter an error when executing the statement, because " + e.getMessage());
     } finally {
-      if (connection != null) {
-        connection.close();
+      if (session != null) {
+        try {
+          session.close();
+        } catch (IoTDBConnectionException e) {
+          System.out.println("Encounter an error when closing the connection, because " + e.getMessage());
+        }
       }
     }
   }
 
   private static void importFromSingleFile(File file) {
     if (file.getName().endsWith(FILE_SUFFIX)) {
-      loadDataFromCSV(file, 1);
+      loadDataFromCSV(file);
     } else {
       System.out.println("File "+ file.getName() +"  should ends with '.csv' if you want to import");
     }
   }
 
   private static void importFromDirectory(File file) {
-    int i = 1;
     File[] files = file.listFiles();
     if (files == null) {
       return;
@@ -515,8 +268,7 @@ public class ImportCsv extends AbstractCsvTool {
     for (File subFile : files) {
       if (subFile.isFile()) {
         if (subFile.getName().endsWith(FILE_SUFFIX)) {
-          loadDataFromCSV(subFile, i);
-          i++;
+          loadDataFromCSV(subFile);
         } else {
           System.out.println("File " + file.getName() + " should ends with '.csv' if you want to import");
         }
@@ -525,7 +277,7 @@ public class ImportCsv extends AbstractCsvTool {
   }
 
   private static int getFileLineCount(File file) throws IOException {
-    int line = 0;
+    int line;
     try (LineNumberReader count = new LineNumberReader(new FileReader(file))) {
       while (count.skip(Long.MAX_VALUE) > 0) {
         // Loop just in case the file is > Long.MAX_VALUE or skip() decides to not read the entire file
@@ -535,4 +287,43 @@ public class ImportCsv extends AbstractCsvTool {
     }
     return line;
   }
+
+  private static void splitColToDeviceAndMeasurement(String col, Map<String, List<String>> deviceToMeasurements) {
+    if (col.length() > 0) {
+      if (col.charAt(col.length() - 1) == TsFileConstant.DOUBLE_QUOTE) {
+        int endIndex = col.lastIndexOf('"', col.length() - 2);
+        // if a double quotes with escape character
+        while (endIndex != -1 && col.charAt(endIndex - 1) == '\\') {
+          endIndex = col.lastIndexOf('"', endIndex - 2);
+        }
+        if (endIndex != -1 && (endIndex == 0 || col.charAt(endIndex - 1) == '.')) {
+          putDeviceAndMeasurement(col.substring(0, endIndex - 1), col.substring(endIndex), deviceToMeasurements);
+        } else {
+          throw new IllegalArgumentException(ILLEGAL_PATH_ARGUMENT);
+        }
+      } else if (col.charAt(col.length() - 1) != TsFileConstant.DOUBLE_QUOTE
+          && col.charAt(col.length() - 1) != TsFileConstant.PATH_SEPARATOR_CHAR) {
+        int endIndex = col.lastIndexOf(TsFileConstant.PATH_SEPARATOR_CHAR);
+        if (endIndex < 0) {
+          putDeviceAndMeasurement("", col, deviceToMeasurements);
+        } else {
+          putDeviceAndMeasurement(col.substring(0, endIndex), col.substring(endIndex + 1), deviceToMeasurements);
+        }
+      } else {
+        throw new IllegalArgumentException(ILLEGAL_PATH_ARGUMENT);
+      }
+    } else {
+      putDeviceAndMeasurement("", col, deviceToMeasurements);
+    }
+  }
+
+  private static void putDeviceAndMeasurement(String device, String measurement, Map<String, List<String>> deviceToMeasurements) {
+    if(deviceToMeasurements.get(device) == null) {
+      List<String> measurements = new ArrayList<>();
+      measurements.add(measurement);
+      deviceToMeasurements.put(device, measurements);
+    } else {
+      deviceToMeasurements.get(device).add(measurement);
+    }
+  }
 }