You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/01/03 03:04:59 UTC

[incubator-iotdb] branch test_insertion_delay created (now e23834e)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch test_insertion_delay
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at e23834e  add statistics

This branch includes the following new commits:

     new e23834e  add statistics

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: add statistics

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch test_insertion_delay
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e23834e236ffc08704bd3bc70e440cfadb2c24a2
Author: lta <li...@163.com>
AuthorDate: Fri Jan 3 11:04:38 2020 +0800

    add statistics
---
 .../org/apache/iotdb/session/SessionExample.java   | 215 +++++++++++++++++++++
 1 file changed, 215 insertions(+)

diff --git a/session/src/main/java/org/apache/iotdb/session/SessionExample.java b/session/src/main/java/org/apache/iotdb/session/SessionExample.java
new file mode 100644
index 0000000..1c3fd87
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/SessionExample.java
@@ -0,0 +1,215 @@
+package org.apache.iotdb.session;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionExample {
+
+  private static Session session;
+  private static final Logger LOG = LoggerFactory.getLogger(SessionExample.class);
+  private static int sgNum = 1;
+
+  public static void main(String[] args)
+      throws IoTDBSessionException {
+    if (args.length == 0) {
+      LOG.info("Please input IP");
+      System.exit(1);
+    }
+    session = new Session(args[0], 6667, "root", "root");
+    session.open();
+
+    //nohup java -Xms500m -Xmx500m -Xmn250m -Xss256k -server -XX:+HeapDumpOnOutOfMemoryError -jar client-example.jar &
+
+    int operFlag = 1;
+    try {
+      operFlag = Integer.parseInt(args[0]);
+    } catch (Exception e) {
+
+    }
+
+     /*final int beginTag=1;
+      final long beginTime=1514451315000L;//2017.12.28
+      final long endTime=System.currentTimeMillis();//1530720000000L;*/
+
+    ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(sgNum);
+    if (operFlag == 1) {
+
+      for (int i = 1; i <= sgNum; i++) {
+        final int finalI = i;
+        exec.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              insert(String.format("root.group%d", finalI), 0);
+            } catch (Exception e) {
+              e.printStackTrace();
+              LOG.error("-----------------{}----------------", e.getMessage(), e);
+            }
+          }
+        }, 0, 5000, TimeUnit.MILLISECONDS);
+      }
+
+    }
+
+    //query();
+
+
+
+    /*insert();
+    insertInBatch();
+    insertRowBatch();
+    nonQuery();
+    query();
+    deleteData();
+    deleteTimeseries();*/
+
+//    session.close();
+  }
+
+
+  private static void insert(String deviceId, int beginTag) throws IoTDBSessionException {
+    List<String> measurements = new ArrayList<>();
+    List<String> values = new ArrayList<>();
+    for (int i = beginTag * 100000 + 1; i < (beginTag + 1) * 100000 + 1; i++) {
+      measurements.add(String.format("s%d", i));
+      values.add(Math.random() + "");
+    }
+    long begin = System.nanoTime();
+    session.insert(deviceId, System.currentTimeMillis(), measurements, values);
+    LOG.info("Device(SG){}写数据花费:{}ms", deviceId, (System.nanoTime() - begin) / 1_000_000);
+  }
+
+  private static void insert() throws IoTDBSessionException {
+    String deviceId = "root.group.d1";
+    List<String> measurements = new ArrayList<>();
+
+    for (int i = 1; i < 50001; i++) {
+      measurements.add(String.format("s%d", i));
+    }
+    for (int i = 1; i < 10; i++) {//10row
+      List<String> values = new ArrayList<>();
+      for (int j = 1; j < 50001; j++) {//5w points
+        values.add(Math.random() + "");
+      }
+      long begin = System.nanoTime();
+      session.insert(deviceId, System.currentTimeMillis(), measurements, values);
+      System.out.println((System.nanoTime() - begin) / 1000000000 + "s");
+    }
+  }
+
+  private static void insertInBatch() throws IoTDBSessionException {
+    String deviceId = "root.sg1.d1";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<String>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("1");
+      values.add("2");
+      values.add("3");
+
+      deviceIds.add(deviceId);
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+  }
+
+  private static void insertRowBatch() throws IoTDBSessionException {
+    Schema schema = new Schema();
+    schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+    schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+    schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
+
+    RowBatch rowBatch = schema.createRowBatch("root.sg1.d1", 100);
+
+    long[] timestamps = rowBatch.timestamps;
+    Object[] values = rowBatch.values;
+
+    for (long time = 0; time < 100; time++) {
+      int row = rowBatch.batchSize++;
+      timestamps[row] = time;
+      for (int i = 0; i < 3; i++) {
+        long[] sensor = (long[]) values[i];
+        sensor[row] = i;
+      }
+      if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
+        session.insertBatch(rowBatch);
+        rowBatch.reset();
+      }
+    }
+
+    if (rowBatch.batchSize != 0) {
+      session.insertBatch(rowBatch);
+      rowBatch.reset();
+    }
+  }
+
+  private static void deleteData() throws IoTDBSessionException {
+    String path = "root.sg1.d1.s1";
+    long deleteTime = 99;
+    session.deleteData(path, deleteTime);
+  }
+
+  private static void deleteTimeseries() throws IoTDBSessionException {
+    List<String> paths = new ArrayList<>();
+    paths.add("root.sg1.d1.s1");
+    paths.add("root.sg1.d1.s2");
+    paths.add("root.sg1.d1.s3");
+    session.deleteTimeseries(paths);
+  }
+
+  private static void query() throws TException, IoTDBRPCException, SQLException {
+    //SessionDataSet dataSet = session.executeQueryStatement("select * from root.dtxy.d1");
+    StringBuffer sb = new StringBuffer();
+    sb.append("select ");
+    for (int i = 1; i < 101; i++) {
+      sb.append("last_value(s").append(i).append("),max_time(s").append(i).append("),");
+    }
+    long begin = System.nanoTime();
+    SessionDataSet dataSet = session
+        .executeQueryStatement(sb.substring(0, sb.length() - 1) + " from root.dtxy.group1");
+    System.out.println((System.nanoTime() - begin) / 1000000000 + "s");
+    //SessionDataSet dataSet = session.executeQueryStatement("select last_value(s1),max_time(s1) from root.dtxy.d1");
+    dataSet.setBatchSize(1024); // default is 512
+    while (dataSet.hasNext()) {
+      System.out.println(dataSet.next());
+    }
+
+    dataSet.closeOperationHandle();
+  }
+
+  private static void nonQuery() throws TException, IoTDBRPCException, SQLException {
+    session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
+  }
+}
\ No newline at end of file