You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/25 14:12:50 UTC
[iotdb] 06/07: fix conflicts
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7b816c0b5903812c6debbb21f2e48b150a6a290b
Merge: 4fd32d6 9ccbfeb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Nov 25 22:11:09 2021 +0800
fix conflicts
.../java/org/apache/iotdb/cli/AbstractCli.java | 69 +++++++++++++
cli/src/main/java/org/apache/iotdb/cli/Cli.java | 1 +
generator/pom.xml | 111 +++++++++++++++++++++
.../java/org/apache/iotdb/generator/Generator.java | 47 +++++++++
.../apache/iotdb/generator/GeneratorEntrance.java | 52 ++++++++++
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 11 ++
pom.xml | 1 +
.../resources/conf/iotdb-engine.properties | 3 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 13 ++-
.../org/apache/iotdb/session/ClusterSession.java | 109 ++++++++++++++++++++
.../java/org/apache/iotdb/session/Session.java | 11 +-
.../apache/iotdb/session/SessionConnection.java | 2 +-
thrift/src/main/thrift/rpc.thrift | 2 +
15 files changed, 441 insertions(+), 3 deletions(-)
diff --cc generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
index 0000000,d97ad7f..b26006e
mode 000000,100644..100644
--- a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
+++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
@@@ -1,0 -1,53 +1,52 @@@
+ package org.apache.iotdb.generator;
+
+ import org.apache.iotdb.rpc.IoTDBConnectionException;
+ import org.apache.iotdb.rpc.StatementExecutionException;
+ import org.apache.iotdb.session.ClusterSession;
+ import org.apache.iotdb.session.SessionDataSet;
+ import org.apache.iotdb.tsfile.write.record.Tablet;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class GeneratorEntrance {
+ private static final Logger logger = LoggerFactory.getLogger(GeneratorEntrance.class);
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException, InterruptedException {
+ args = new String[6];
+ args[1] = "127.0.0.1:6667";
+ args[2] = "root.sg1.d1.s1";
+ args[3] = "3";
+ args[4] = "1000";
+ String[] addressElements = args[1].split(":");
+ String seriesPath = args[2];
+ int timeInterval = Integer.parseInt(args[3]) * 1000;
+ int batchNum = Integer.parseInt(args[4]);
+ String[] pathElements = seriesPath.split("\\.");
+ String measurementId = pathElements[pathElements.length - 1];
+ String deviceId = seriesPath.substring(0, seriesPath.length() - measurementId.length() - 1);
+
+ ClusterSession clusterSession =
+ new ClusterSession(addressElements[0], Integer.parseInt(addressElements[1]));
+
+ long timestampForInsert = 0;
+ while (true) {
+ long startTime = System.currentTimeMillis();
+ Tablet tablet =
+ Generator.generateTablet(
+ deviceId, pathElements[pathElements.length - 1], timestampForInsert, batchNum);
+ clusterSession.insertTablet(tablet);
+ logger.info("Insert {} data points to {}", batchNum, seriesPath);
- String query = String.format("select count(%s) from %s", measurementId, deviceId);
- SessionDataSet sessionDataSet =
- clusterSession.queryTablet(
- query, deviceId);
- logger.info("Execute query {} with result : {}",query,sessionDataSet.next().getFields().get(0));
++ String query = String.format("select count(%s) from %s", measurementId, deviceId);
++ SessionDataSet sessionDataSet = clusterSession.queryTablet(query, deviceId);
++ logger.info(
++ "Execute query {} with result : {}", query, sessionDataSet.next().getFields().get(0));
+ timestampForInsert += batchNum;
+ long endTime = System.currentTimeMillis();
- if (timeInterval - (endTime - startTime)>0) {
++ if (timeInterval - (endTime - startTime) > 0) {
+ Thread.sleep(timeInterval - (endTime - startTime));
+ }
+ }
+ }
+ }
diff --cc session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index 0000000,d7617ca..a6e0fad
mode 000000,100644..100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@@ -1,0 -1,112 +1,109 @@@
+ package org.apache.iotdb.session;
+
+ import org.apache.iotdb.rpc.IoTDBConnectionException;
+ import org.apache.iotdb.rpc.StatementExecutionException;
+ import org.apache.iotdb.service.rpc.thrift.EndPoint;
+ import org.apache.iotdb.tsfile.write.record.Tablet;
+
+ import java.util.*;
+ import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
+
+ public class ClusterSession {
+ Session[] sessions;
+ ArrayBlockingQueue<Tablet>[] queues;
+ List<EndPoint> nodeList;
+
+ public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
+ Session session = new Session(host, rpcPort);
+ session.open();
+ nodeList = new ArrayList<>();
+ nodeList.addAll(session.getNodeList());
+ sessions = new Session[nodeList.size()];
+ queues = new ArrayBlockingQueue[nodeList.size()];
+ for (int i = 0; i < nodeList.size(); i++) {
+ sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
+ sessions[i].open();
+ queues[i] = new ArrayBlockingQueue<Tablet>(1000);
+ new Thread(new RunnableTask(i)).start();
+ }
+ }
+
+ public void insertTablet(Tablet tablet)
+ throws StatementExecutionException, IoTDBConnectionException {
+ int hashVal = tablet.prefixPath.hashCode();
+ int index = hashVal % nodeList.size();
+ for (int i = 0; i < 2; i++) {
+ int j = (index + i) % nodeList.size();
+ synchronized (queues[j]) {
+ if (!queues[j].isEmpty()) {
+ queues[j].add(tablet);
+ queues[j].notifyAll();
+ continue;
+ }
+ }
+ try {
+ sessions[j].insertTablet(tablet);
+ } catch (Exception e) {
+ synchronized (queues[j]) {
+ queues[j].add(tablet);
+ queues[j].notifyAll();
+ }
+ }
+ }
+ }
+
+ public SessionDataSet queryTablet(String sql, String deviceId) {
+ int hashVal = deviceId.hashCode();
+ int index = hashVal % nodeList.size();
+ SessionDataSet sessionDataSet = null;
+ try {
+ sessionDataSet = sessions[index].executeQueryStatement(sql);
+ } catch (Exception e) {
+ try {
+ sessionDataSet = sessions[(index + 1) % nodeList.size()].executeQueryStatement(sql);
+ } catch (Exception ex) {
+ // never happen, once the node restart, it won't be killed anymore.
+ e.printStackTrace();
+ }
+ }
+ return sessionDataSet;
+ }
+
+ public Session reconnect(int index) throws IoTDBConnectionException {
+ sessions[index] = new Session(nodeList.get(index).ip, nodeList.get(index).port);
+ sessions[index].open();
+ return sessions[index];
+ }
+
+ class RunnableTask implements Runnable {
+ int index;
+
+ public RunnableTask(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void run() {
- Tablet tablet;
- while (true) {
- Tablet t;
- synchronized (queues[index]) {
- if (queues[index].isEmpty()) {
- try {
- queues[index].wait(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- try {
- Session session = reconnect(index);
- t = queues[index].poll();
- session.insertTablet(t);
- } catch (StatementExecutionException | IoTDBConnectionException e) {
- }
++ Tablet tablet;
++ while (true) {
++ Tablet t;
++ synchronized (queues[index]) {
++ if (queues[index].isEmpty()) {
++ try {
++ queues[index].wait(1000);
++ } catch (InterruptedException e) {
++ e.printStackTrace();
++ }
++ } else {
++ try {
++ Session session = reconnect(index);
++ t = queues[index].poll();
++ session.insertTablet(t);
++ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ }
+ }
-
+ }
-
++ }
+ }
+ }
+ }