You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/26 12:42:17 UTC

[incubator-iotdb] branch cluster_read updated: add abstract timer class

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_read by this push:
     new a01dbc8  add abstract timer class
a01dbc8 is described below

commit a01dbc88b729750de65afa3f2ed4cee396609ddf
Author: lta <li...@163.com>
AuthorDate: Fri Apr 26 20:41:56 2019 +0800

    add abstract timer class
---
 cluster/pom.xml                                    |   2 +-
 .../cluster/entity/raft/DataStateMachine.java      |   2 +-
 .../querynode/ClusterLocalQueryManager.java        |   4 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |   3 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |   2 +-
 .../iotdb/cluster/utils/timer/RepeatedTimer.java   | 190 +++++++++++++++++++++
 6 files changed, 197 insertions(+), 6 deletions(-)

diff --git a/cluster/pom.xml b/cluster/pom.xml
index fc94d1d..25d13ea 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -142,7 +142,7 @@
                     <outputDirectory>${project.basedir}/../iotdb/iotdb/lib_cluster</outputDirectory>
                 </configuration>
             </plugin>
-            <!--using `mvn test` to run UT, `mvn verify` to run ITs
+            <!--using `mvn test` to triggerAction UT, `mvn verify` to triggerAction ITs
                         Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index b8c6f43..389cb74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -116,7 +116,7 @@ public class DataStateMachine extends StateMachineAdapter {
         PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte);
 
         LOGGER.debug("OperatorType :{}", plan.getOperatorType());
-        /** If the request is to set path and sg of the path doesn't exist, it needs to run null-read in meta group to avoid out of data sync **/
+        /** If the request is to set path and sg of the path doesn't exist, it needs to triggerAction null-read in meta group to avoid out of data sync **/
         if (plan.getOperatorType() == OperatorType.CREATE_TIMESERIES && !checkPathExistence(
             ((MetadataPlan) plan).getPath().getFullPath())) {
           RaftUtils.handleNullReadToMetaGroup(status);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 2032424..fc0983b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -19,12 +19,12 @@
 package org.apache.iotdb.cluster.query.manager.querynode;
 
 import com.alipay.sofa.jraft.util.OnlyForTest;
-import com.alipay.sofa.jraft.util.RepeatedTimer;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.utils.timer.RepeatedTimer;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -141,7 +141,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
     }
 
     @Override
-    protected void onTrigger() {
+    public void run() {
       try {
         close(taskId);
       } catch (FileNodeManagerException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 39cf614..000249c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -108,6 +108,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   public ClusterLocalSingleQueryManager(long jobId, QueryRepeaterTimer queryRepeaterTimer) {
     this.jobId = jobId;
     this.queryRepeaterTimer = queryRepeaterTimer;
+    this.queryRepeaterTimer.start();
   }
 
   @Override
@@ -288,7 +289,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
 
   @Override
   public void close() throws FileNodeManagerException {
-    queryRepeaterTimer.destroy();
+    queryRepeaterTimer.stop();
     QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index fe2bdb6..10598ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -55,7 +55,7 @@ public class RaftNodeAsClientManager {
   private static final int TASK_TIMEOUT_MS = CLUSTER_CONFIG.getQpTaskTimeout();
 
   /**
-   * Max valid number of @NodeAsClient usage, represent the number can run simultaneously at the
+   * Max valid number of @NodeAsClient usage, represent the number can triggerAction simultaneously at the
    * same time
    */
   private static final int MAX_VALID_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/timer/RepeatedTimer.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/timer/RepeatedTimer.java
new file mode 100644
index 0000000..09f50bd
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/timer/RepeatedTimer.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils.timer;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class RepeatedTimer {
+
+  /**
+   * Name of timerMs
+   */
+  private String timerName;
+
+  /**
+   * Lock resource
+   */
+  private Lock lock = new ReentrantLock();
+
+  /**
+   * Timer
+   */
+  private Timer timerMs;
+
+  /**
+   * Task to execute when timeout
+   */
+  private TimerTask timerTask;
+
+  /**
+   * The unit is millisecond.
+   */
+  private volatile int timeoutMs;
+
+  /**
+   * Mark whether the timerMs is stopped or not.
+   */
+  private volatile boolean stopped;
+
+  /**
+   * Mark whether the timer is running or not.
+   */
+  private volatile boolean running;
+
+  public RepeatedTimer(String name, int timeoutMs) {
+    this.timerName = name;
+    this.timeoutMs = timeoutMs;
+    this.stopped = true;
+    this.timerMs = new Timer(timerName);
+  }
+
+  /**
+   * Start repeated timer.
+   */
+  public void start() {
+    lock.lock();
+    try {
+      if (stopped && !running) {
+        stopped = false;
+        running = true;
+        schedule();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Subclass implement this method to do actual task.
+   */
+  public abstract void run();
+
+  /**
+   * Schedule timer task
+   */
+  private void schedule() {
+    if (timerTask != null) {
+      timerTask.cancel();
+    }
+    timerTask = new TimerTask() {
+      @Override
+      public void run() {
+        RepeatedTimer.this.triggerAction();
+      }
+    };
+    timerMs.schedule(timerTask, timeoutMs);
+  }
+
+  /**
+   * Trigger action when timeout
+   */
+  public void triggerAction() {
+    lock.lock();
+    try {
+      run();
+      repeatTimerTask();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Repeat timer task.
+   */
+  private void repeatTimerTask(){
+    if(!stopped){
+      schedule();
+    }
+  }
+
+  /**
+   * Reset repeated timer with current timeout parameter.
+   */
+  public void reset() {
+    lock.lock();
+    try {
+      reset(timeoutMs);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Reset repeated timer with new timeout parameter.
+   *
+   * @param timeoutMs timeout millisecond
+   */
+  public void reset(int timeoutMs) {
+    lock.lock();
+    this.timeoutMs = timeoutMs;
+    try {
+      if (!stopped) {
+        schedule();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Stop the timer
+   */
+  public void stop() {
+    lock.lock();
+    try {
+      if (!stopped) {
+        stopped = true;
+        if (timerTask != null) {
+          timerTask.cancel();
+          running = false;
+          timerTask = null;
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "RepeatedTimer{" +
+        "lock=" + lock +
+        ", timerMs=" + timerMs +
+        ", timerTask=" + timerTask +
+        ", stopped=" + stopped +
+        ", running=" + running +
+        ", timeoutMs=" + timeoutMs +
+        ", timerName='" + timerName + '\'' +
+        '}';
+  }
+}
+