You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/06/27 17:35:50 UTC
[1/2] storm git commit: STORM-3120: Clean up leftover null checks in
Time, ensure idle threads get to run when cluster time is advanced
Repository: storm
Updated Branches:
refs/heads/master 69801887c -> d88076716
STORM-3120: Clean up leftover null checks in Time, ensure idle threads get to run when cluster time is advanced
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5ca0c97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5ca0c97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5ca0c97
Branch: refs/heads/master
Commit: e5ca0c97f31f4d2e92427c9ed2b00519985fc43c
Parents: d6f8afb
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jun 24 12:47:42 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jun 24 23:02:41 2018 +0200
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/utils/Time.java | 78 +++++++++-----------
.../java/org/apache/storm/LocalCluster.java | 1 +
.../src/main/java/org/apache/storm/Testing.java | 4 +-
3 files changed, 39 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca0c97/storm-client/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Time.java b/storm-client/src/jvm/org/apache/storm/utils/Time.java
index e47c521..0c7bfab 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Time.java
@@ -12,10 +12,13 @@
package org.apache.storm.utils;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,26 +33,7 @@ public class Time {
private static final AtomicLong AUTO_ADVANCE_NANOS_ON_SLEEP = new AtomicLong(0);
private static final Map<Thread, AtomicLong> THREAD_SLEEP_TIMES_NANOS = new ConcurrentHashMap<>();
private static final Object SLEEP_TIMES_LOCK = new Object();
- private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new AtomicLong(0);
-
- @Deprecated
- public static void startSimulating() {
- synchronized (Time.SLEEP_TIMES_LOCK) {
- Time.SIMULATING.set(true);
- Time.SIMULATED_CURR_TIME_NANOS.set(0);
- Time.THREAD_SLEEP_TIMES_NANOS.clear();
- Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(0);
- LOG.warn("Simulated Time Starting...");
- }
- }
-
- @Deprecated
- public static void stopSimulating() {
- synchronized (Time.SLEEP_TIMES_LOCK) {
- Time.SIMULATING.set(false);
- LOG.warn("Simulated Time Ending...");
- }
- }
+ private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new AtomicLong(0);
public static boolean isSimulating() {
return SIMULATING.get();
@@ -82,45 +66,49 @@ public class Time {
private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
try {
synchronized (SLEEP_TIMES_LOCK) {
- if (THREAD_SLEEP_TIMES_NANOS == null) {
+ if (!SIMULATING.get()) {
LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(),
- new RuntimeException("STACK TRACE"));
+ new RuntimeException("STACK TRACE"));
throw new InterruptedException();
}
THREAD_SLEEP_TIMES_NANOS.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
}
while (SIMULATED_CURR_TIME_NANOS.get() < targetTimeNanos) {
synchronized (SLEEP_TIMES_LOCK) {
- if (THREAD_SLEEP_TIMES_NANOS == null) {
+ if (!SIMULATING.get()) {
LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(),
new RuntimeException("STACK TRACE"));
throw new InterruptedException();
}
- }
- long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get();
- if (autoAdvance > 0) {
- advanceTimeNanos(autoAdvance);
+ long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get();
+ if (autoAdvance > 0) {
+ advanceTimeNanos(autoAdvance);
+ }
}
Thread.sleep(10);
}
} finally {
- synchronized (SLEEP_TIMES_LOCK) {
- if (SIMULATING.get() && THREAD_SLEEP_TIMES_NANOS != null) {
- THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread());
- }
- }
+ THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread());
}
}
-
+
public static void sleep(long ms) throws InterruptedException {
if (ms > 0) {
- sleepUntil(currentTimeMillis() + ms);
+ if (SIMULATING.get()) {
+ simulatedSleepUntilNanos(millisToNanos(currentTimeMillis() + ms));
+ } else {
+ Thread.sleep(ms);
+ }
}
}
- public static void sleepNanos(long nanos) throws InterruptedException {
+ public static void parkNanos(long nanos) throws InterruptedException {
if (nanos > 0) {
- sleepUntilNanos(nanoTime() + nanos);
+ if (SIMULATING.get()) {
+ simulatedSleepUntilNanos(nanoTime() + nanos);
+ } else {
+ LockSupport.parkNanos(nanos);
+ }
}
}
@@ -185,8 +173,17 @@ public class Time {
if (nanos < 0) {
throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
}
- long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos);
- LOG.debug("Advanced simulated time to {}", newTime);
+ synchronized (SLEEP_TIMES_LOCK) {
+ long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos);
+ Iterator<AtomicLong> sleepTimesIter = THREAD_SLEEP_TIMES_NANOS.values().iterator();
+ while (sleepTimesIter.hasNext()) {
+ AtomicLong curr = sleepTimesIter.next();
+ if (SIMULATED_CURR_TIME_NANOS.get() >= curr.get()) {
+ sleepTimesIter.remove();
+ }
+ }
+ LOG.debug("Advanced simulated time to {}", newTime);
+ }
}
public static void advanceTimeSecs(long secs) {
@@ -197,10 +194,7 @@ public class Time {
if (!SIMULATING.get()) {
throw new IllegalStateException("Must be in simulation mode");
}
- AtomicLong time;
- synchronized (SLEEP_TIMES_LOCK) {
- time = THREAD_SLEEP_TIMES_NANOS.get(t);
- }
+ AtomicLong time = THREAD_SLEEP_TIMES_NANOS.get(t);
return !t.isAlive() || time != null && nanoTime() < time.longValue();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca0c97/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 15679a8..c160dc9 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -700,6 +700,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
@Override
public void advanceClusterTime(int secs, int incSecs) throws InterruptedException {
+ waitForIdle();
for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) {
int diff = Math.min(incSecs, amountLeft);
Time.advanceTimeSecs(diff);
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca0c97/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java
index 6ac6370..4bb47b0 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -82,7 +82,7 @@ public class Testing {
* passed
* @param condition what we are waiting for
* @param body what to run in the loop
- * @throws AssertionError if teh loop timed out.
+ * @throws AssertionError if the loop timed out.
*/
public static void whileTimeout(Condition condition, Runnable body) {
whileTimeout(TEST_TIMEOUT_MS, condition, body);
@@ -94,7 +94,7 @@ public class Testing {
* @param timeoutMs the number of ms to wait before timing out.
* @param condition what we are waiting for
* @param body what to run in the loop
- * @throws AssertionError if teh loop timed out.
+ * @throws AssertionError if the loop timed out.
*/
public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
long endTime = System.currentTimeMillis() + timeoutMs;
[2/2] storm git commit: Merge branch 'STORM-3120' of
https://github.com/srdo/storm into STORM-3120
Posted by bo...@apache.org.
Merge branch 'STORM-3120' of https://github.com/srdo/storm into STORM-3120
STORM-3120: Clean up leftover null checks in Time, ensure idle thread
This closes #2734
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d8807671
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d8807671
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d8807671
Branch: refs/heads/master
Commit: d8807671627ee4add1bbe67e6ae19adbf8515938
Parents: 6980188 e5ca0c9
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Wed Jun 27 12:03:27 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Wed Jun 27 12:03:27 2018 -0500
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/utils/Time.java | 78 +++++++++-----------
.../java/org/apache/storm/LocalCluster.java | 1 +
.../src/main/java/org/apache/storm/Testing.java | 4 +-
3 files changed, 39 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d8807671/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------