You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/25 15:41:04 UTC
[iotdb] branch xianyi updated: finish generator
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xianyi by this push:
new c576976 finish generator
c576976 is described below
commit c5769764060cd1e04a17c31af517552f289626d2
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Nov 25 23:38:11 2021 +0800
finish generator
---
.../java/org/apache/iotdb/cli/AbstractCli.java | 2 +-
generator/pom.xml | 74 +++++++++++-----------
.../apache/iotdb/generator/GeneratorEntrance.java | 32 +++++++---
.../org/apache/iotdb/session/ClusterSession.java | 67 ++++++++++++++------
4 files changed, 110 insertions(+), 65 deletions(-)
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 91798cf..240bad3 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -381,7 +381,7 @@ public abstract class AbstractCli {
if (specialCmd.startsWith(SELECT_COUNT) && nodeList.size() != 0) {
String[] paths = specialCmd.split(" ");
String deviceId = paths[paths.length - 1];
- EndPoint endpoint = nodeList.get(deviceId.hashCode() % nodeList.size());
+ EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % nodeList.size());
if (endpoint.port == Integer.parseInt(port)) {
executeQuery(connection, cmd);
} else {
diff --git a/generator/pom.xml b/generator/pom.xml
index 8916a19..6c16532 100644
--- a/generator/pom.xml
+++ b/generator/pom.xml
@@ -71,41 +71,43 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-jdbc</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>org.awaitility</groupId>
- <artifactId>awaitility</artifactId>
- <version>4.0.2</version>
- <scope>test</scope>
- </dependency>
- <!-- for mocked test-->
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ <classpathPrefix>lib/</classpathPrefix>
+ <mainClass>org.apache.iotdb.generator.GeneratorEntrance</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
index b26006e..be471a2 100644
--- a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
+++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.generator;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -14,15 +33,10 @@ public class GeneratorEntrance {
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException, InterruptedException {
- args = new String[6];
- args[1] = "127.0.0.1:6667";
- args[2] = "root.sg1.d1.s1";
- args[3] = "3";
- args[4] = "1000";
- String[] addressElements = args[1].split(":");
- String seriesPath = args[2];
- int timeInterval = Integer.parseInt(args[3]) * 1000;
- int batchNum = Integer.parseInt(args[4]);
+ String[] addressElements = args[0].split(":");
+ String seriesPath = args[1];
+ int timeInterval = Integer.parseInt(args[2]) * 1000;
+ int batchNum = Integer.parseInt(args[3]);
String[] pathElements = seriesPath.split("\\.");
String measurementId = pathElements[pathElements.length - 1];
String deviceId = seriesPath.substring(0, seriesPath.length() - measurementId.length() - 1);
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index a6e0fad..496d166 100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.session;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -5,12 +24,13 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.tsfile.write.record.Tablet;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
public class ClusterSession {
Session[] sessions;
- ArrayBlockingQueue<Tablet>[] queues;
+ ArrayBlockingQueue[] queues;
List<EndPoint> nodeList;
public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
@@ -23,14 +43,14 @@ public class ClusterSession {
for (int i = 0; i < nodeList.size(); i++) {
sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
sessions[i].open();
- queues[i] = new ArrayBlockingQueue<Tablet>(1000);
+ queues[i] = new ArrayBlockingQueue<>(1000);
new Thread(new RunnableTask(i)).start();
}
}
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
- int hashVal = tablet.prefixPath.hashCode();
+ int hashVal = Math.abs(tablet.prefixPath.hashCode());
int index = hashVal % nodeList.size();
for (int i = 0; i < 2; i++) {
int j = (index + i) % nodeList.size();
@@ -53,7 +73,7 @@ public class ClusterSession {
}
public SessionDataSet queryTablet(String sql, String deviceId) {
- int hashVal = deviceId.hashCode();
+ int hashVal = Math.abs(deviceId.hashCode());
int index = hashVal % nodeList.size();
SessionDataSet sessionDataSet = null;
try {
@@ -69,10 +89,10 @@ public class ClusterSession {
return sessionDataSet;
}
- public Session reconnect(int index) throws IoTDBConnectionException {
- sessions[index] = new Session(nodeList.get(index).ip, nodeList.get(index).port);
- sessions[index].open();
- return sessions[index];
+ public Session newSession(int index) throws IoTDBConnectionException {
+ Session session = new Session(nodeList.get(index).ip, nodeList.get(index).port);
+ session.open();
+ return session;
}
class RunnableTask implements Runnable {
@@ -84,23 +104,32 @@ public class ClusterSession {
@Override
public void run() {
- Tablet tablet;
while (true) {
- Tablet t;
synchronized (queues[index]) {
- if (queues[index].isEmpty()) {
+ while (queues[index].isEmpty()) {
try {
queues[index].wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
- } else {
- try {
- Session session = reconnect(index);
- t = queues[index].poll();
- session.insertTablet(t);
- } catch (StatementExecutionException | IoTDBConnectionException e) {
- }
+ }
+ }
+
+ Tablet t = null;
+ try {
+ Session session = newSession(index);
+ while (!queues[index].isEmpty()) {
+ t = (Tablet) queues[index].poll();
+ session.insertTablet(t);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ if (t != null) {
+ queues[index].add(t);
+ }
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException interruptedException) {
+ interruptedException.printStackTrace();
}
}
}