You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/06/27 17:35:50 UTC

[1/2] storm git commit: STORM-3120: Clean up leftover null checks in Time, ensure idle threads get to run when cluster time is advanced

Repository: storm
Updated Branches:
  refs/heads/master 69801887c -> d88076716


STORM-3120: Clean up leftover null checks in Time, ensure idle threads get to run when cluster time is advanced


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5ca0c97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5ca0c97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5ca0c97

Branch: refs/heads/master
Commit: e5ca0c97f31f4d2e92427c9ed2b00519985fc43c
Parents: d6f8afb
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jun 24 12:47:42 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jun 24 23:02:41 2018 +0200

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/utils/Time.java    | 78 +++++++++-----------
 .../java/org/apache/storm/LocalCluster.java     |  1 +
 .../src/main/java/org/apache/storm/Testing.java |  4 +-
 3 files changed, 39 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca0c97/storm-client/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Time.java b/storm-client/src/jvm/org/apache/storm/utils/Time.java
index e47c521..0c7bfab 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Time.java
@@ -12,10 +12,13 @@
 
 package org.apache.storm.utils;
 
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,26 +33,7 @@ public class Time {
     private static final AtomicLong AUTO_ADVANCE_NANOS_ON_SLEEP = new AtomicLong(0);
     private static final Map<Thread, AtomicLong> THREAD_SLEEP_TIMES_NANOS = new ConcurrentHashMap<>();
     private static final Object SLEEP_TIMES_LOCK = new Object();
-    private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new AtomicLong(0);
-
-    @Deprecated
-    public static void startSimulating() {
-        synchronized (Time.SLEEP_TIMES_LOCK) {
-            Time.SIMULATING.set(true);
-            Time.SIMULATED_CURR_TIME_NANOS.set(0);
-            Time.THREAD_SLEEP_TIMES_NANOS.clear();
-            Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(0);
-            LOG.warn("Simulated Time Starting...");
-        }
-    }
-
-    @Deprecated
-    public static void stopSimulating() {
-        synchronized (Time.SLEEP_TIMES_LOCK) {
-            Time.SIMULATING.set(false);
-            LOG.warn("Simulated Time Ending...");
-        }
-    }
+    private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new AtomicLong(0); 
 
     public static boolean isSimulating() {
         return SIMULATING.get();
@@ -82,45 +66,49 @@ public class Time {
     private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
         try {
             synchronized (SLEEP_TIMES_LOCK) {
-                if (THREAD_SLEEP_TIMES_NANOS == null) {
+                if (!SIMULATING.get()) {
                     LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(),
-                              new RuntimeException("STACK TRACE"));
+                        new RuntimeException("STACK TRACE"));
                     throw new InterruptedException();
                 }
                 THREAD_SLEEP_TIMES_NANOS.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
             }
             while (SIMULATED_CURR_TIME_NANOS.get() < targetTimeNanos) {
                 synchronized (SLEEP_TIMES_LOCK) {
-                    if (THREAD_SLEEP_TIMES_NANOS == null) {
+                    if (!SIMULATING.get()) {
                         LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(),
                                   new RuntimeException("STACK TRACE"));
                         throw new InterruptedException();
                     }
-                }
-                long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get();
-                if (autoAdvance > 0) {
-                    advanceTimeNanos(autoAdvance);
+                    long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get();
+                    if (autoAdvance > 0) {
+                        advanceTimeNanos(autoAdvance);
+                    }
                 }
                 Thread.sleep(10);
             }
         } finally {
-            synchronized (SLEEP_TIMES_LOCK) {
-                if (SIMULATING.get() && THREAD_SLEEP_TIMES_NANOS != null) {
-                    THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread());
-                }
-            }
+            THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread());
         }
     }
-
+    
     public static void sleep(long ms) throws InterruptedException {
         if (ms > 0) {
-            sleepUntil(currentTimeMillis() + ms);
+            if (SIMULATING.get()) {
+                simulatedSleepUntilNanos(millisToNanos(currentTimeMillis() + ms));
+            } else {
+                Thread.sleep(ms);
+            }
         }
     }
 
-    public static void sleepNanos(long nanos) throws InterruptedException {
+    public static void parkNanos(long nanos) throws InterruptedException {
         if (nanos > 0) {
-            sleepUntilNanos(nanoTime() + nanos);
+            if (SIMULATING.get()) {
+                simulatedSleepUntilNanos(nanoTime() + nanos);
+            } else {
+                LockSupport.parkNanos(nanos);
+            }
         }
     }
 
@@ -185,8 +173,17 @@ public class Time {
         if (nanos < 0) {
             throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
         }
-        long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos);
-        LOG.debug("Advanced simulated time to {}", newTime);
+        synchronized (SLEEP_TIMES_LOCK) {
+            long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos);
+            Iterator<AtomicLong> sleepTimesIter = THREAD_SLEEP_TIMES_NANOS.values().iterator();
+            while (sleepTimesIter.hasNext()) {
+                AtomicLong curr = sleepTimesIter.next();
+                if (SIMULATED_CURR_TIME_NANOS.get() >= curr.get()) {
+                    sleepTimesIter.remove();
+                }
+            }
+            LOG.debug("Advanced simulated time to {}", newTime);
+        }
     }
 
     public static void advanceTimeSecs(long secs) {
@@ -197,10 +194,7 @@ public class Time {
         if (!SIMULATING.get()) {
             throw new IllegalStateException("Must be in simulation mode");
         }
-        AtomicLong time;
-        synchronized (SLEEP_TIMES_LOCK) {
-            time = THREAD_SLEEP_TIMES_NANOS.get(t);
-        }
+        AtomicLong time = THREAD_SLEEP_TIMES_NANOS.get(t);
         return !t.isAlive() || time != null && nanoTime() < time.longValue();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca0c97/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 15679a8..c160dc9 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -700,6 +700,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
 
     @Override
     public void advanceClusterTime(int secs, int incSecs) throws InterruptedException {
+        waitForIdle();
         for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) {
             int diff = Math.min(incSecs, amountLeft);
             Time.advanceTimeSecs(diff);

http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca0c97/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java
index 6ac6370..4bb47b0 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -82,7 +82,7 @@ public class Testing {
      * passed
      * @param condition what we are waiting for
      * @param body what to run in the loop
-     * @throws AssertionError if teh loop timed out.
+     * @throws AssertionError if the loop timed out.
      */
     public static void whileTimeout(Condition condition, Runnable body) {
         whileTimeout(TEST_TIMEOUT_MS, condition, body);
@@ -94,7 +94,7 @@ public class Testing {
      * @param timeoutMs the number of ms to wait before timing out.
      * @param condition what we are waiting for
      * @param body what to run in the loop
-     * @throws AssertionError if teh loop timed out.
+     * @throws AssertionError if the loop timed out.
      */
     public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
         long endTime = System.currentTimeMillis() + timeoutMs;


[2/2] storm git commit: Merge branch 'STORM-3120' of https://github.com/srdo/storm into STORM-3120

Posted by bo...@apache.org.
Merge branch 'STORM-3120' of https://github.com/srdo/storm into STORM-3120

STORM-3120: Clean up leftover null checks in Time, ensure idle thread

This closes #2734


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d8807671
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d8807671
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d8807671

Branch: refs/heads/master
Commit: d8807671627ee4add1bbe67e6ae19adbf8515938
Parents: 6980188 e5ca0c9
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Wed Jun 27 12:03:27 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Wed Jun 27 12:03:27 2018 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/utils/Time.java    | 78 +++++++++-----------
 .../java/org/apache/storm/LocalCluster.java     |  1 +
 .../src/main/java/org/apache/storm/Testing.java |  4 +-
 3 files changed, 39 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d8807671/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------