You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/01/03 03:05:00 UTC
[incubator-iotdb] 01/01: add statistics
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch test_insertion_delay
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e23834e236ffc08704bd3bc70e440cfadb2c24a2
Author: lta <li...@163.com>
AuthorDate: Fri Jan 3 11:04:38 2020 +0800
add statistics
---
.../org/apache/iotdb/session/SessionExample.java | 215 +++++++++++++++++++++
1 file changed, 215 insertions(+)
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionExample.java b/session/src/main/java/org/apache/iotdb/session/SessionExample.java
new file mode 100644
index 0000000..1c3fd87
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/SessionExample.java
@@ -0,0 +1,215 @@
+package org.apache.iotdb.session;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionExample {
+
+ private static Session session;
+ private static final Logger LOG = LoggerFactory.getLogger(SessionExample.class);
+ private static int sgNum = 1;
+
+ public static void main(String[] args)
+ throws IoTDBSessionException {
+ if (args.length == 0) {
+ LOG.info("Please input IP");
+ System.exit(1);
+ }
+ session = new Session(args[0], 6667, "root", "root");
+ session.open();
+
+ //nohup java -Xms500m -Xmx500m -Xmn250m -Xss256k -server -XX:+HeapDumpOnOutOfMemoryError -jar client-example.jar &
+
+ int operFlag = 1;
+ try {
+ operFlag = Integer.parseInt(args[0]);
+ } catch (Exception e) {
+
+ }
+
+ /*final int beginTag=1;
+ final long beginTime=1514451315000L;//2017.12.28
+ final long endTime=System.currentTimeMillis();//1530720000000L;*/
+
+ ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(sgNum);
+ if (operFlag == 1) {
+
+ for (int i = 1; i <= sgNum; i++) {
+ final int finalI = i;
+ exec.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ insert(String.format("root.group%d", finalI), 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.error("-----------------{}----------------", e.getMessage(), e);
+ }
+ }
+ }, 0, 5000, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ //query();
+
+
+
+ /*insert();
+ insertInBatch();
+ insertRowBatch();
+ nonQuery();
+ query();
+ deleteData();
+ deleteTimeseries();*/
+
+// session.close();
+ }
+
+
+ private static void insert(String deviceId, int beginTag) throws IoTDBSessionException {
+ List<String> measurements = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+ for (int i = beginTag * 100000 + 1; i < (beginTag + 1) * 100000 + 1; i++) {
+ measurements.add(String.format("s%d", i));
+ values.add(Math.random() + "");
+ }
+ long begin = System.nanoTime();
+ session.insert(deviceId, System.currentTimeMillis(), measurements, values);
+ LOG.info("Device(SG){}写数据花费:{}ms", deviceId, (System.nanoTime() - begin) / 1_000_000);
+ }
+
+ private static void insert() throws IoTDBSessionException {
+ String deviceId = "root.group.d1";
+ List<String> measurements = new ArrayList<>();
+
+ for (int i = 1; i < 50001; i++) {
+ measurements.add(String.format("s%d", i));
+ }
+ for (int i = 1; i < 10; i++) {//10row
+ List<String> values = new ArrayList<>();
+ for (int j = 1; j < 50001; j++) {//5w points
+ values.add(Math.random() + "");
+ }
+ long begin = System.nanoTime();
+ session.insert(deviceId, System.currentTimeMillis(), measurements, values);
+ System.out.println((System.nanoTime() - begin) / 1000000000 + "s");
+ }
+ }
+
+ private static void insertInBatch() throws IoTDBSessionException {
+ String deviceId = "root.sg1.d1";
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<String>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("1");
+ values.add("2");
+ values.add("3");
+
+ deviceIds.add(deviceId);
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ }
+
+ private static void insertRowBatch() throws IoTDBSessionException {
+ Schema schema = new Schema();
+ schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+ schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+ schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
+
+ RowBatch rowBatch = schema.createRowBatch("root.sg1.d1", 100);
+
+ long[] timestamps = rowBatch.timestamps;
+ Object[] values = rowBatch.values;
+
+ for (long time = 0; time < 100; time++) {
+ int row = rowBatch.batchSize++;
+ timestamps[row] = time;
+ for (int i = 0; i < 3; i++) {
+ long[] sensor = (long[]) values[i];
+ sensor[row] = i;
+ }
+ if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
+ session.insertBatch(rowBatch);
+ rowBatch.reset();
+ }
+ }
+
+ if (rowBatch.batchSize != 0) {
+ session.insertBatch(rowBatch);
+ rowBatch.reset();
+ }
+ }
+
+ private static void deleteData() throws IoTDBSessionException {
+ String path = "root.sg1.d1.s1";
+ long deleteTime = 99;
+ session.deleteData(path, deleteTime);
+ }
+
+ private static void deleteTimeseries() throws IoTDBSessionException {
+ List<String> paths = new ArrayList<>();
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d1.s2");
+ paths.add("root.sg1.d1.s3");
+ session.deleteTimeseries(paths);
+ }
+
+ private static void query() throws TException, IoTDBRPCException, SQLException {
+ //SessionDataSet dataSet = session.executeQueryStatement("select * from root.dtxy.d1");
+ StringBuffer sb = new StringBuffer();
+ sb.append("select ");
+ for (int i = 1; i < 101; i++) {
+ sb.append("last_value(s").append(i).append("),max_time(s").append(i).append("),");
+ }
+ long begin = System.nanoTime();
+ SessionDataSet dataSet = session
+ .executeQueryStatement(sb.substring(0, sb.length() - 1) + " from root.dtxy.group1");
+ System.out.println((System.nanoTime() - begin) / 1000000000 + "s");
+ //SessionDataSet dataSet = session.executeQueryStatement("select last_value(s1),max_time(s1) from root.dtxy.d1");
+ dataSet.setBatchSize(1024); // default is 512
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ private static void nonQuery() throws TException, IoTDBRPCException, SQLException {
+ session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
+ }
+}
\ No newline at end of file