You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:12 UTC

[11/14] storm git commit: STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: time changes

STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: time changes

copy Time.java from 1.x-branch to allow use of nanoTime() in storm-kafka-client, and also update SlotTest to use try-with-resources since new Time implementation ditched startSimulatingAutoAdvanceOnSleep().

This was a selective cherry-pick of a03137ed, retaining only those changes needed.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29fc006d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29fc006d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29fc006d

Branch: refs/heads/1.0.x-branch
Commit: 29fc006d379c055fc69c65be47de8e4229987d6a
Parents: 6d92df3
Author: Erik Weathers <er...@gmail.com>
Authored: Tue Feb 6 20:02:43 2018 -0800
Committer: Erik Weathers <er...@gmail.com>
Committed: Wed Feb 7 18:53:24 2018 -0800

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/utils/Time.java    | 183 +++++++++++++------
 .../storm/daemon/supervisor/SlotTest.java       |  31 +---
 2 files changed, 139 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/29fc006d/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index e501b6c..a6a4fe1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -24,38 +24,67 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+/**
+ * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time.
+ * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls.
+ * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime().
+ */
 public class Time {
     public static final Logger LOG = LoggerFactory.getLogger(Time.class);
     
     private static AtomicBoolean simulating = new AtomicBoolean(false);
-    private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
+    private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0);
     //TODO: should probably use weak references here or something
-    private static volatile Map<Thread, AtomicLong> threadSleepTimes;
+    private static volatile Map<Thread, AtomicLong> threadSleepTimesNanos;
     private static final Object sleepTimesLock = new Object();
+    private static AtomicLong simulatedCurrTimeNanos;
     
-    private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
-    
-    public static void startSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(true);
-            simulatedCurrTimeMs = new AtomicLong(0);
-            threadSleepTimes = new ConcurrentHashMap<>();
+    public static class SimulatedTime implements AutoCloseable {
+
+        public SimulatedTime() {
+            this(null);
+        }
+        
+        public SimulatedTime(Number advanceTimeMs) {
+            synchronized(Time.sleepTimesLock) {
+                Time.simulating.set(true);
+                Time.simulatedCurrTimeNanos = new AtomicLong(0);
+                Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+                if (advanceTimeMs != null) {
+                    Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue()));
+                }
+                LOG.warn("AutoCloseable Simulated Time Starting...");
+            }
+        }
+        
+        @Override
+        public void close() {
+            synchronized(Time.sleepTimesLock) {
+                Time.simulating.set(false);    
+                Time.autoAdvanceNanosOnSleep.set(0);
+                Time.threadSleepTimesNanos = null;
+                LOG.warn("AutoCloseable Simulated Time Ending...");
+            }
         }
     }
     
-    public static void startSimulatingAutoAdvanceOnSleep(long ms) {
-        synchronized(sleepTimesLock) {
-            startSimulating();
-            autoAdvanceOnSleep.set(ms);
+    @Deprecated
+    public static void startSimulating() {
+        synchronized(Time.sleepTimesLock) {
+            Time.simulating.set(true);
+            Time.simulatedCurrTimeNanos = new AtomicLong(0);
+            Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+            LOG.warn("Simulated Time Starting...");
         }
     }
     
+    @Deprecated
     public static void stopSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(false);    
-            autoAdvanceOnSleep.set(0);
-            threadSleepTimes = null;
+        synchronized(Time.sleepTimesLock) {
+            Time.simulating.set(false);    
+            Time.autoAdvanceNanosOnSleep.set(0);
+            Time.threadSleepTimesNanos = null;
+            LOG.warn("Simulated Time Ending...");
         }
     }
     
@@ -65,44 +94,66 @@ public class Time {
     
     public static void sleepUntil(long targetTimeMs) throws InterruptedException {
         if(simulating.get()) {
-            try {
-                synchronized(sleepTimesLock) {
-                    if (threadSleepTimes == null) {
+            simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
+        } else {
+            long sleepTimeMs = targetTimeMs - currentTimeMillis();
+            if(sleepTimeMs>0) {
+                Thread.sleep(sleepTimeMs);
+            }
+        }
+    }
+    
+    public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+        if(simulating.get()) {
+            simulatedSleepUntilNanos(targetTimeNanos);
+        } else {
+            long sleepTimeNanos = targetTimeNanos-nanoTime();
+            long sleepTimeMs = nanosToMillis(sleepTimeNanos);
+            int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000);
+            if(sleepTimeNanos>0) {
+                Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
+            } 
+        }
+    }
+    
+    private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+        try {
+            synchronized (sleepTimesLock) {
+                if (threadSleepTimesNanos == null) {
+                    LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
+                    throw new InterruptedException();
+                }
+                threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
+            }
+            while (simulatedCurrTimeNanos.get() < targetTimeNanos) {
+                synchronized (sleepTimesLock) {
+                    if (threadSleepTimesNanos == null) {
                         LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
                         throw new InterruptedException();
                     }
-                    threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
                 }
-                while(simulatedCurrTimeMs.get() < targetTimeMs) {
-                    synchronized(sleepTimesLock) {
-                        if (threadSleepTimes == null) {
-                            LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
-                            throw new InterruptedException();
-                        }
-                    }
-                    long autoAdvance = autoAdvanceOnSleep.get();
-                    if (autoAdvance > 0) {
-                        advanceTime(autoAdvance);
-                    }
-                    Thread.sleep(10);
+                long autoAdvance = autoAdvanceNanosOnSleep.get();
+                if (autoAdvance > 0) {
+                    advanceTimeNanos(autoAdvance);
                 }
-            } finally {
-                synchronized(sleepTimesLock) {
-                    if (simulating.get() && threadSleepTimes != null) {
-                        threadSleepTimes.remove(Thread.currentThread());
-                    }
+                Thread.sleep(10);
+            }
+        } finally {
+            synchronized (sleepTimesLock) {
+                if (simulating.get() && threadSleepTimesNanos != null) {
+                    threadSleepTimesNanos.remove(Thread.currentThread());
                 }
             }
-        } else {
-            long sleepTime = targetTimeMs-currentTimeMillis();
-            if(sleepTime>0) 
-                Thread.sleep(sleepTime);
         }
     }
 
     public static void sleep(long ms) throws InterruptedException {
         sleepUntil(currentTimeMillis()+ms);
     }
+    
+    public static void sleepNanos(long nanos) throws InterruptedException {
+        sleepUntilNanos(nanoTime() + nanos);
+    }
 
     public static void sleepSecs (long secs) throws InterruptedException {
         if (secs > 0) {
@@ -110,14 +161,30 @@ public class Time {
         }
     }
     
+    public static long nanoTime() {
+        if (simulating.get()) {
+            return simulatedCurrTimeNanos.get();
+        } else {
+            return System.nanoTime();
+        }
+    }
+    
     public static long currentTimeMillis() {
         if(simulating.get()) {
-            return simulatedCurrTimeMs.get();
+            return nanosToMillis(simulatedCurrTimeNanos.get());
         } else {
             return System.currentTimeMillis();
         }
     }
 
+    public static long nanosToMillis(long nanos) {
+        return nanos/1_000_000;
+    }
+    
+    public static long millisToNanos(long millis) {
+        return millis*1_000_000;
+    }
+    
     public static long secsToMillis (int secs) {
         return 1000*(long) secs;
     }
@@ -139,18 +206,32 @@ public class Time {
     }
     
     public static void advanceTime(long ms) {
-        if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
-        if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
-        long newTime = simulatedCurrTimeMs.addAndGet(ms);
-        LOG.warn("Advanced simulated time to {}", newTime);
+        advanceTimeNanos(millisToNanos(ms));
+    }
+    
+    public static void advanceTimeNanos(long nanos) {
+        if (!simulating.get()) {
+            throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+        }
+        if (nanos < 0) {
+            throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
+        }
+        long newTime = simulatedCurrTimeNanos.addAndGet(nanos);
+        LOG.debug("Advanced simulated time to {}", newTime);
+    }
+    
+    public static void advanceTimeSecs(long secs) {
+        advanceTime(secs * 1_000);
     }
     
     public static boolean isThreadWaiting(Thread t) {
-        if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
+        if(!simulating.get()) {
+            throw new IllegalStateException("Must be in simulation mode");
+        }
         AtomicLong time;
         synchronized(sleepTimesLock) {
-            time = threadSleepTimes.get(t);
+            time = threadSleepTimesNanos.get(t);
         }
-        return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
+        return !t.isAlive() || time!=null && nanoTime() < time.longValue();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/29fc006d/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
index 24ccda5..9cd85f8 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -43,6 +43,7 @@ import org.apache.storm.localizer.ILocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Test;
 
 public class SlotTest {
@@ -113,8 +114,7 @@ public class SlotTest {
     
     @Test
     public void testEmptyToEmpty() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             ILocalizer localizer = mock(ILocalizer.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -125,15 +125,12 @@ public class SlotTest {
             DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             assertTrue(Time.currentTimeMillis() > 1000);
-        } finally {
-            Time.stopSimulating();
         }
     }
     
     @Test
     public void testLaunchContainerFromEmpty() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String topoId = "NEW";
             List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
@@ -210,16 +207,13 @@ public class SlotTest {
             assertSame(newAssignment, nextState.currentAssignment);
             assertSame(container, nextState.container);
             assertTrue(Time.currentTimeMillis() > 2000);
-        } finally {
-            Time.stopSimulating();
         }
     }
 
 
     @Test
     public void testRelaunch() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String topoId = "CURRENT";
             List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
@@ -260,15 +254,12 @@ public class SlotTest {
             
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
-        } finally {
-            Time.stopSimulating();
         }
     }
     
     @Test
     public void testReschedule() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
             List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
@@ -368,16 +359,13 @@ public class SlotTest {
             assertSame(nAssignment, nextState.currentAssignment);
             assertSame(nContainer, nextState.container);
             assertTrue(Time.currentTimeMillis() > 4000);
-        } finally {
-            Time.stopSimulating();
         }
     }
 
     
     @Test
     public void testRunningToEmpty() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
             List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
@@ -432,15 +420,12 @@ public class SlotTest {
             assertEquals(null, nextState.container);
             assertEquals(null, nextState.currentAssignment);
             assertTrue(Time.currentTimeMillis() > 3000);
-        } finally {
-            Time.stopSimulating();
         }
     }
     
     @Test
     public void testRunWithProfileActions() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
             List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
@@ -508,8 +493,6 @@ public class SlotTest {
             assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions);
             assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 5000);
-        } finally {
-            Time.stopSimulating();
         }
     }
 }