You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/25 15:41:04 UTC

[iotdb] branch xianyi updated: finish generator

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xianyi by this push:
     new c576976  finish generator
c576976 is described below

commit c5769764060cd1e04a17c31af517552f289626d2
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Nov 25 23:38:11 2021 +0800

    finish generator
---
 .../java/org/apache/iotdb/cli/AbstractCli.java     |  2 +-
 generator/pom.xml                                  | 74 +++++++++++-----------
 .../apache/iotdb/generator/GeneratorEntrance.java  | 32 +++++++---
 .../org/apache/iotdb/session/ClusterSession.java   | 67 ++++++++++++++------
 4 files changed, 110 insertions(+), 65 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 91798cf..240bad3 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -381,7 +381,7 @@ public abstract class AbstractCli {
     if (specialCmd.startsWith(SELECT_COUNT) && nodeList.size() != 0) {
       String[] paths = specialCmd.split(" ");
       String deviceId = paths[paths.length - 1];
-      EndPoint endpoint = nodeList.get(deviceId.hashCode() % nodeList.size());
+      EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % nodeList.size());
       if (endpoint.port == Integer.parseInt(port)) {
         executeQuery(connection, cmd);
       } else {
diff --git a/generator/pom.xml b/generator/pom.xml
index 8916a19..6c16532 100644
--- a/generator/pom.xml
+++ b/generator/pom.xml
@@ -71,41 +71,43 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-jdbc</artifactId>
-            <version>${project.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.awaitility</groupId>
-            <artifactId>awaitility</artifactId>
-            <version>4.0.2</version>
-            <scope>test</scope>
-        </dependency>
-        <!-- for mocked test-->
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-api-mockito2</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-pool2</artifactId>
-        </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.1.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                            <mainClass>org.apache.iotdb.generator.GeneratorEntrance</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.3</version>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
index b26006e..be471a2 100644
--- a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
+++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.generator;
 
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -14,15 +33,10 @@ public class GeneratorEntrance {
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException, InterruptedException {
-    args = new String[6];
-    args[1] = "127.0.0.1:6667";
-    args[2] = "root.sg1.d1.s1";
-    args[3] = "3";
-    args[4] = "1000";
-    String[] addressElements = args[1].split(":");
-    String seriesPath = args[2];
-    int timeInterval = Integer.parseInt(args[3]) * 1000;
-    int batchNum = Integer.parseInt(args[4]);
+    String[] addressElements = args[0].split(":");
+    String seriesPath = args[1];
+    int timeInterval = Integer.parseInt(args[2]) * 1000;
+    int batchNum = Integer.parseInt(args[3]);
     String[] pathElements = seriesPath.split("\\.");
     String measurementId = pathElements[pathElements.length - 1];
     String deviceId = seriesPath.substring(0, seriesPath.length() - measurementId.length() - 1);
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index a6e0fad..496d166 100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.session;
 
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -5,12 +24,13 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 
 public class ClusterSession {
   Session[] sessions;
-  ArrayBlockingQueue<Tablet>[] queues;
+  ArrayBlockingQueue[] queues;
   List<EndPoint> nodeList;
 
   public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
@@ -23,14 +43,14 @@ public class ClusterSession {
     for (int i = 0; i < nodeList.size(); i++) {
       sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
       sessions[i].open();
-      queues[i] = new ArrayBlockingQueue<Tablet>(1000);
+      queues[i] = new ArrayBlockingQueue<>(1000);
       new Thread(new RunnableTask(i)).start();
     }
   }
 
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
-    int hashVal = tablet.prefixPath.hashCode();
+    int hashVal = Math.abs(tablet.prefixPath.hashCode());
     int index = hashVal % nodeList.size();
     for (int i = 0; i < 2; i++) {
       int j = (index + i) % nodeList.size();
@@ -53,7 +73,7 @@ public class ClusterSession {
   }
 
   public SessionDataSet queryTablet(String sql, String deviceId) {
-    int hashVal = deviceId.hashCode();
+    int hashVal = Math.abs(deviceId.hashCode());
     int index = hashVal % nodeList.size();
     SessionDataSet sessionDataSet = null;
     try {
@@ -69,10 +89,10 @@ public class ClusterSession {
     return sessionDataSet;
   }
 
-  public Session reconnect(int index) throws IoTDBConnectionException {
-    sessions[index] = new Session(nodeList.get(index).ip, nodeList.get(index).port);
-    sessions[index].open();
-    return sessions[index];
+  public Session newSession(int index) throws IoTDBConnectionException {
+    Session session = new Session(nodeList.get(index).ip, nodeList.get(index).port);
+    session.open();
+    return session;
   }
 
   class RunnableTask implements Runnable {
@@ -84,23 +104,32 @@ public class ClusterSession {
 
     @Override
     public void run() {
-      Tablet tablet;
       while (true) {
-        Tablet t;
         synchronized (queues[index]) {
-          if (queues[index].isEmpty()) {
+          while (queues[index].isEmpty()) {
             try {
               queues[index].wait(1000);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }
-          } else {
-            try {
-              Session session = reconnect(index);
-              t = queues[index].poll();
-              session.insertTablet(t);
-            } catch (StatementExecutionException | IoTDBConnectionException e) {
-            }
+          }
+        }
+
+        Tablet t = null;
+        try {
+          Session session = newSession(index);
+          while (!queues[index].isEmpty()) {
+            t = (Tablet) queues[index].poll();
+            session.insertTablet(t);
+          }
+        } catch (IoTDBConnectionException | StatementExecutionException e) {
+          if (t != null) {
+            queues[index].add(t);
+          }
+          try {
+            Thread.sleep(3000);
+          } catch (InterruptedException interruptedException) {
+            interruptedException.printStackTrace();
           }
         }
       }