You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/25 14:12:50 UTC

[iotdb] 06/07: fix conflicts

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7b816c0b5903812c6debbb21f2e48b150a6a290b
Merge: 4fd32d6 9ccbfeb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Nov 25 22:11:09 2021 +0800

    fix conflicts

 .../java/org/apache/iotdb/cli/AbstractCli.java     |  69 +++++++++++++
 cli/src/main/java/org/apache/iotdb/cli/Cli.java    |   1 +
 generator/pom.xml                                  | 111 +++++++++++++++++++++
 .../java/org/apache/iotdb/generator/Generator.java |  47 +++++++++
 .../apache/iotdb/generator/GeneratorEntrance.java  |  52 ++++++++++
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |  11 ++
 pom.xml                                            |   1 +
 .../resources/conf/iotdb-engine.properties         |   3 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   2 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  13 ++-
 .../org/apache/iotdb/session/ClusterSession.java   | 109 ++++++++++++++++++++
 .../java/org/apache/iotdb/session/Session.java     |  11 +-
 .../apache/iotdb/session/SessionConnection.java    |   2 +-
 thrift/src/main/thrift/rpc.thrift                  |   2 +
 15 files changed, 441 insertions(+), 3 deletions(-)

diff --cc generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
index 0000000,d97ad7f..b26006e
mode 000000,100644..100644
--- a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
+++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
@@@ -1,0 -1,53 +1,52 @@@
+ package org.apache.iotdb.generator;
+ 
+ import org.apache.iotdb.rpc.IoTDBConnectionException;
+ import org.apache.iotdb.rpc.StatementExecutionException;
+ import org.apache.iotdb.session.ClusterSession;
+ import org.apache.iotdb.session.SessionDataSet;
+ import org.apache.iotdb.tsfile.write.record.Tablet;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class GeneratorEntrance {
+   private static final Logger logger = LoggerFactory.getLogger(GeneratorEntrance.class);
+ 
+   public static void main(String[] args)
+       throws IoTDBConnectionException, StatementExecutionException, InterruptedException {
+     args = new String[6];
+     args[1] = "127.0.0.1:6667";
+     args[2] = "root.sg1.d1.s1";
+     args[3] = "3";
+     args[4] = "1000";
+     String[] addressElements = args[1].split(":");
+     String seriesPath = args[2];
+     int timeInterval = Integer.parseInt(args[3]) * 1000;
+     int batchNum = Integer.parseInt(args[4]);
+     String[] pathElements = seriesPath.split("\\.");
+     String measurementId = pathElements[pathElements.length - 1];
+     String deviceId = seriesPath.substring(0, seriesPath.length() - measurementId.length() - 1);
+ 
+     ClusterSession clusterSession =
+         new ClusterSession(addressElements[0], Integer.parseInt(addressElements[1]));
+ 
+     long timestampForInsert = 0;
+     while (true) {
+       long startTime = System.currentTimeMillis();
+       Tablet tablet =
+           Generator.generateTablet(
+               deviceId, pathElements[pathElements.length - 1], timestampForInsert, batchNum);
+       clusterSession.insertTablet(tablet);
+       logger.info("Insert {} data points to {}", batchNum, seriesPath);
 -      String query =  String.format("select count(%s) from %s", measurementId, deviceId);
 -      SessionDataSet sessionDataSet =
 -          clusterSession.queryTablet(
 -                  query, deviceId);
 -      logger.info("Execute query {} with result : {}",query,sessionDataSet.next().getFields().get(0));
++      String query = String.format("select count(%s) from %s", measurementId, deviceId);
++      SessionDataSet sessionDataSet = clusterSession.queryTablet(query, deviceId);
++      logger.info(
++          "Execute query {} with result : {}", query, sessionDataSet.next().getFields().get(0));
+       timestampForInsert += batchNum;
+       long endTime = System.currentTimeMillis();
 -      if (timeInterval - (endTime - startTime)>0) {
++      if (timeInterval - (endTime - startTime) > 0) {
+         Thread.sleep(timeInterval - (endTime - startTime));
+       }
+     }
+   }
+ }
diff --cc session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index 0000000,d7617ca..a6e0fad
mode 000000,100644..100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@@ -1,0 -1,112 +1,109 @@@
+ package org.apache.iotdb.session;
+ 
+ import org.apache.iotdb.rpc.IoTDBConnectionException;
+ import org.apache.iotdb.rpc.StatementExecutionException;
+ import org.apache.iotdb.service.rpc.thrift.EndPoint;
+ import org.apache.iotdb.tsfile.write.record.Tablet;
+ 
+ import java.util.*;
+ import java.util.concurrent.ArrayBlockingQueue;
 -import java.util.concurrent.TimeUnit;
+ 
+ public class ClusterSession {
+   Session[] sessions;
+   ArrayBlockingQueue<Tablet>[] queues;
+   List<EndPoint> nodeList;
+ 
+   public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
+     Session session = new Session(host, rpcPort);
+     session.open();
+     nodeList = new ArrayList<>();
+     nodeList.addAll(session.getNodeList());
+     sessions = new Session[nodeList.size()];
+     queues = new ArrayBlockingQueue[nodeList.size()];
+     for (int i = 0; i < nodeList.size(); i++) {
+       sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
+       sessions[i].open();
+       queues[i] = new ArrayBlockingQueue<Tablet>(1000);
+       new Thread(new RunnableTask(i)).start();
+     }
+   }
+ 
+   public void insertTablet(Tablet tablet)
+       throws StatementExecutionException, IoTDBConnectionException {
+     int hashVal = tablet.prefixPath.hashCode();
+     int index = hashVal % nodeList.size();
+     for (int i = 0; i < 2; i++) {
+       int j = (index + i) % nodeList.size();
+       synchronized (queues[j]) {
+         if (!queues[j].isEmpty()) {
+           queues[j].add(tablet);
+           queues[j].notifyAll();
+           continue;
+         }
+       }
+       try {
+         sessions[j].insertTablet(tablet);
+       } catch (Exception e) {
+         synchronized (queues[j]) {
+           queues[j].add(tablet);
+           queues[j].notifyAll();
+         }
+       }
+     }
+   }
+ 
+   public SessionDataSet queryTablet(String sql, String deviceId) {
+     int hashVal = deviceId.hashCode();
+     int index = hashVal % nodeList.size();
+     SessionDataSet sessionDataSet = null;
+     try {
+       sessionDataSet = sessions[index].executeQueryStatement(sql);
+     } catch (Exception e) {
+       try {
+         sessionDataSet = sessions[(index + 1) % nodeList.size()].executeQueryStatement(sql);
+       } catch (Exception ex) {
+         // never happen, once the node restart, it won't be killed anymore.
+         e.printStackTrace();
+       }
+     }
+     return sessionDataSet;
+   }
+ 
+   public Session reconnect(int index) throws IoTDBConnectionException {
+     sessions[index] = new Session(nodeList.get(index).ip, nodeList.get(index).port);
+     sessions[index].open();
+     return sessions[index];
+   }
+ 
+   class RunnableTask implements Runnable {
+     int index;
+ 
+     public RunnableTask(int index) {
+       this.index = index;
+     }
+ 
+     @Override
+     public void run() {
 -        Tablet tablet;
 -        while (true) {
 -          Tablet t;
 -          synchronized (queues[index]) {
 -            if (queues[index].isEmpty()) {
 -              try {
 -                queues[index].wait(1000);
 -              } catch (InterruptedException e) {
 -                e.printStackTrace();
 -              }
 -            } else {
 -              try {
 -                Session session = reconnect(index);
 -                t = queues[index].poll();
 -                session.insertTablet(t);
 -              } catch (StatementExecutionException | IoTDBConnectionException e) {
 -              }
++      Tablet tablet;
++      while (true) {
++        Tablet t;
++        synchronized (queues[index]) {
++          if (queues[index].isEmpty()) {
++            try {
++              queues[index].wait(1000);
++            } catch (InterruptedException e) {
++              e.printStackTrace();
++            }
++          } else {
++            try {
++              Session session = reconnect(index);
++              t = queues[index].poll();
++              session.insertTablet(t);
++            } catch (StatementExecutionException | IoTDBConnectionException e) {
+             }
+           }
 -
+         }
 -
++      }
+     }
+   }
+ }