You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/26 12:42:17 UTC
[incubator-iotdb] branch cluster_read updated: add abstract timer
class
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new a01dbc8 add abstract timer class
a01dbc8 is described below
commit a01dbc88b729750de65afa3f2ed4cee396609ddf
Author: lta <li...@163.com>
AuthorDate: Fri Apr 26 20:41:56 2019 +0800
add abstract timer class
---
cluster/pom.xml | 2 +-
.../cluster/entity/raft/DataStateMachine.java | 2 +-
.../querynode/ClusterLocalQueryManager.java | 4 +-
.../querynode/ClusterLocalSingleQueryManager.java | 3 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 2 +-
.../iotdb/cluster/utils/timer/RepeatedTimer.java | 190 +++++++++++++++++++++
6 files changed, 197 insertions(+), 6 deletions(-)
diff --git a/cluster/pom.xml b/cluster/pom.xml
index fc94d1d..25d13ea 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -142,7 +142,7 @@
<outputDirectory>${project.basedir}/../iotdb/iotdb/lib_cluster</outputDirectory>
</configuration>
</plugin>
- <!--using `mvn test` to run UT, `mvn verify` to run ITs
+ <!--using `mvn test` to triggerAction UT, `mvn verify` to triggerAction ITs
Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index b8c6f43..389cb74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -116,7 +116,7 @@ public class DataStateMachine extends StateMachineAdapter {
PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte);
LOGGER.debug("OperatorType :{}", plan.getOperatorType());
- /** If the request is to set path and sg of the path doesn't exist, it needs to run null-read in meta group to avoid out of data sync **/
+ /** If the request is to set path and sg of the path doesn't exist, it needs to triggerAction null-read in meta group to avoid out of data sync **/
if (plan.getOperatorType() == OperatorType.CREATE_TIMESERIES && !checkPathExistence(
((MetadataPlan) plan).getPath().getFullPath())) {
RaftUtils.handleNullReadToMetaGroup(status);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 2032424..fc0983b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -19,12 +19,12 @@
package org.apache.iotdb.cluster.query.manager.querynode;
import com.alipay.sofa.jraft.util.OnlyForTest;
-import com.alipay.sofa.jraft.util.RepeatedTimer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.utils.timer.RepeatedTimer;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -141,7 +141,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
}
@Override
- protected void onTrigger() {
+ public void run() {
try {
close(taskId);
} catch (FileNodeManagerException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 39cf614..000249c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -108,6 +108,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
public ClusterLocalSingleQueryManager(long jobId, QueryRepeaterTimer queryRepeaterTimer) {
this.jobId = jobId;
this.queryRepeaterTimer = queryRepeaterTimer;
+ this.queryRepeaterTimer.start();
}
@Override
@@ -288,7 +289,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public void close() throws FileNodeManagerException {
- queryRepeaterTimer.destroy();
+ queryRepeaterTimer.stop();
QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index fe2bdb6..10598ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -55,7 +55,7 @@ public class RaftNodeAsClientManager {
private static final int TASK_TIMEOUT_MS = CLUSTER_CONFIG.getQpTaskTimeout();
/**
- * Max valid number of @NodeAsClient usage, represent the number can run simultaneously at the
+ * Max valid number of @NodeAsClient usage, represent the number can triggerAction simultaneously at the
* same time
*/
private static final int MAX_VALID_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/timer/RepeatedTimer.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/timer/RepeatedTimer.java
new file mode 100644
index 0000000..09f50bd
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/timer/RepeatedTimer.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils.timer;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class RepeatedTimer {
+
+ /**
+ * Name of timerMs
+ */
+ private String timerName;
+
+ /**
+ * Lock resource
+ */
+ private Lock lock = new ReentrantLock();
+
+ /**
+ * Timer
+ */
+ private Timer timerMs;
+
+ /**
+ * Task to execute when timeout
+ */
+ private TimerTask timerTask;
+
+ /**
+ * The unit is millisecond.
+ */
+ private volatile int timeoutMs;
+
+ /**
+ * Mark whether the timerMs is stopped or not.
+ */
+ private volatile boolean stopped;
+
+ /**
+ * Mark whether the timer is running or not.
+ */
+ private volatile boolean running;
+
+ public RepeatedTimer(String name, int timeoutMs) {
+ this.timerName = name;
+ this.timeoutMs = timeoutMs;
+ this.stopped = true;
+ this.timerMs = new Timer(timerName);
+ }
+
+ /**
+ * Start repeated timer.
+ */
+ public void start() {
+ lock.lock();
+ try {
+ if (stopped && !running) {
+ stopped = false;
+ running = true;
+ schedule();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Subclass implement this method to do actual task.
+ */
+ public abstract void run();
+
+ /**
+ * Schedule timer task
+ */
+ private void schedule() {
+ if (timerTask != null) {
+ timerTask.cancel();
+ }
+ timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ RepeatedTimer.this.triggerAction();
+ }
+ };
+ timerMs.schedule(timerTask, timeoutMs);
+ }
+
+ /**
+ * Trigger action when timeout
+ */
+ public void triggerAction() {
+ lock.lock();
+ try {
+ run();
+ repeatTimerTask();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Repeat timer task.
+ */
+ private void repeatTimerTask(){
+ if(!stopped){
+ schedule();
+ }
+ }
+
+ /**
+ * Reset repeated timer with current timeout parameter.
+ */
+ public void reset() {
+ lock.lock();
+ try {
+ reset(timeoutMs);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Reset repeated timer with new timeout parameter.
+ *
+ * @param timeoutMs timeout millisecond
+ */
+ public void reset(int timeoutMs) {
+ lock.lock();
+ this.timeoutMs = timeoutMs;
+ try {
+ if (!stopped) {
+ schedule();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Stop the timer
+ */
+ public void stop() {
+ lock.lock();
+ try {
+ if (!stopped) {
+ stopped = true;
+ if (timerTask != null) {
+ timerTask.cancel();
+ running = false;
+ timerTask = null;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RepeatedTimer{" +
+ "lock=" + lock +
+ ", timerMs=" + timerMs +
+ ", timerTask=" + timerTask +
+ ", stopped=" + stopped +
+ ", running=" + running +
+ ", timeoutMs=" + timeoutMs +
+ ", timerName='" + timerName + '\'' +
+ '}';
+ }
+}
+