You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:12 UTC
[11/14] storm git commit: STORM-2937: Overwrite storm-kafka-client
1.x-branch into 1.0.x-branch: time changes
STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: time changes
copy Time.java from 1.x-branch to allow use of nanoTime() in storm-kafka-client, and also update SlotTest to use try-with-resources since new Time implementation ditched startSimulatingAutoAdvanceOnSleep().
This was a selective cherry-pick of a03137ed, retaining only those changes needed.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29fc006d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29fc006d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29fc006d
Branch: refs/heads/1.0.x-branch
Commit: 29fc006d379c055fc69c65be47de8e4229987d6a
Parents: 6d92df3
Author: Erik Weathers <er...@gmail.com>
Authored: Tue Feb 6 20:02:43 2018 -0800
Committer: Erik Weathers <er...@gmail.com>
Committed: Wed Feb 7 18:53:24 2018 -0800
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/utils/Time.java | 183 +++++++++++++------
.../storm/daemon/supervisor/SlotTest.java | 31 +---
2 files changed, 139 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/29fc006d/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index e501b6c..a6a4fe1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -24,38 +24,67 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+/**
+ * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time.
+ * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls.
+ * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime().
+ */
public class Time {
public static final Logger LOG = LoggerFactory.getLogger(Time.class);
private static AtomicBoolean simulating = new AtomicBoolean(false);
- private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
+ private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0);
//TODO: should probably use weak references here or something
- private static volatile Map<Thread, AtomicLong> threadSleepTimes;
+ private static volatile Map<Thread, AtomicLong> threadSleepTimesNanos;
private static final Object sleepTimesLock = new Object();
+ private static AtomicLong simulatedCurrTimeNanos;
- private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
-
- public static void startSimulating() {
- synchronized(sleepTimesLock) {
- simulating.set(true);
- simulatedCurrTimeMs = new AtomicLong(0);
- threadSleepTimes = new ConcurrentHashMap<>();
+ public static class SimulatedTime implements AutoCloseable {
+
+ public SimulatedTime() {
+ this(null);
+ }
+
+ public SimulatedTime(Number advanceTimeMs) {
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(true);
+ Time.simulatedCurrTimeNanos = new AtomicLong(0);
+ Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+ if (advanceTimeMs != null) {
+ Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue()));
+ }
+ LOG.warn("AutoCloseable Simulated Time Starting...");
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(false);
+ Time.autoAdvanceNanosOnSleep.set(0);
+ Time.threadSleepTimesNanos = null;
+ LOG.warn("AutoCloseable Simulated Time Ending...");
+ }
}
}
- public static void startSimulatingAutoAdvanceOnSleep(long ms) {
- synchronized(sleepTimesLock) {
- startSimulating();
- autoAdvanceOnSleep.set(ms);
+ @Deprecated
+ public static void startSimulating() {
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(true);
+ Time.simulatedCurrTimeNanos = new AtomicLong(0);
+ Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+ LOG.warn("Simulated Time Starting...");
}
}
+ @Deprecated
public static void stopSimulating() {
- synchronized(sleepTimesLock) {
- simulating.set(false);
- autoAdvanceOnSleep.set(0);
- threadSleepTimes = null;
+ synchronized(Time.sleepTimesLock) {
+ Time.simulating.set(false);
+ Time.autoAdvanceNanosOnSleep.set(0);
+ Time.threadSleepTimesNanos = null;
+ LOG.warn("Simulated Time Ending...");
}
}
@@ -65,44 +94,66 @@ public class Time {
public static void sleepUntil(long targetTimeMs) throws InterruptedException {
if(simulating.get()) {
- try {
- synchronized(sleepTimesLock) {
- if (threadSleepTimes == null) {
+ simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
+ } else {
+ long sleepTimeMs = targetTimeMs - currentTimeMillis();
+ if(sleepTimeMs>0) {
+ Thread.sleep(sleepTimeMs);
+ }
+ }
+ }
+
+ public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+ if(simulating.get()) {
+ simulatedSleepUntilNanos(targetTimeNanos);
+ } else {
+ long sleepTimeNanos = targetTimeNanos-nanoTime();
+ long sleepTimeMs = nanosToMillis(sleepTimeNanos);
+ int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000);
+ if(sleepTimeNanos>0) {
+ Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
+ }
+ }
+ }
+
+ private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+ try {
+ synchronized (sleepTimesLock) {
+ if (threadSleepTimesNanos == null) {
+ LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
+ throw new InterruptedException();
+ }
+ threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
+ }
+ while (simulatedCurrTimeNanos.get() < targetTimeNanos) {
+ synchronized (sleepTimesLock) {
+ if (threadSleepTimesNanos == null) {
LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
throw new InterruptedException();
}
- threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
}
- while(simulatedCurrTimeMs.get() < targetTimeMs) {
- synchronized(sleepTimesLock) {
- if (threadSleepTimes == null) {
- LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
- throw new InterruptedException();
- }
- }
- long autoAdvance = autoAdvanceOnSleep.get();
- if (autoAdvance > 0) {
- advanceTime(autoAdvance);
- }
- Thread.sleep(10);
+ long autoAdvance = autoAdvanceNanosOnSleep.get();
+ if (autoAdvance > 0) {
+ advanceTimeNanos(autoAdvance);
}
- } finally {
- synchronized(sleepTimesLock) {
- if (simulating.get() && threadSleepTimes != null) {
- threadSleepTimes.remove(Thread.currentThread());
- }
+ Thread.sleep(10);
+ }
+ } finally {
+ synchronized (sleepTimesLock) {
+ if (simulating.get() && threadSleepTimesNanos != null) {
+ threadSleepTimesNanos.remove(Thread.currentThread());
}
}
- } else {
- long sleepTime = targetTimeMs-currentTimeMillis();
- if(sleepTime>0)
- Thread.sleep(sleepTime);
}
}
public static void sleep(long ms) throws InterruptedException {
sleepUntil(currentTimeMillis()+ms);
}
+
+ public static void sleepNanos(long nanos) throws InterruptedException {
+ sleepUntilNanos(nanoTime() + nanos);
+ }
public static void sleepSecs (long secs) throws InterruptedException {
if (secs > 0) {
@@ -110,14 +161,30 @@ public class Time {
}
}
+ public static long nanoTime() {
+ if (simulating.get()) {
+ return simulatedCurrTimeNanos.get();
+ } else {
+ return System.nanoTime();
+ }
+ }
+
public static long currentTimeMillis() {
if(simulating.get()) {
- return simulatedCurrTimeMs.get();
+ return nanosToMillis(simulatedCurrTimeNanos.get());
} else {
return System.currentTimeMillis();
}
}
+ public static long nanosToMillis(long nanos) {
+ return nanos/1_000_000;
+ }
+
+ public static long millisToNanos(long millis) {
+ return millis*1_000_000;
+ }
+
public static long secsToMillis (int secs) {
return 1000*(long) secs;
}
@@ -139,18 +206,32 @@ public class Time {
}
public static void advanceTime(long ms) {
- if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
- if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
- long newTime = simulatedCurrTimeMs.addAndGet(ms);
- LOG.warn("Advanced simulated time to {}", newTime);
+ advanceTimeNanos(millisToNanos(ms));
+ }
+
+ public static void advanceTimeNanos(long nanos) {
+ if (!simulating.get()) {
+ throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+ }
+ if (nanos < 0) {
+ throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
+ }
+ long newTime = simulatedCurrTimeNanos.addAndGet(nanos);
+ LOG.debug("Advanced simulated time to {}", newTime);
+ }
+
+ public static void advanceTimeSecs(long secs) {
+ advanceTime(secs * 1_000);
}
public static boolean isThreadWaiting(Thread t) {
- if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
+ if(!simulating.get()) {
+ throw new IllegalStateException("Must be in simulation mode");
+ }
AtomicLong time;
synchronized(sleepTimesLock) {
- time = threadSleepTimes.get(t);
+ time = threadSleepTimesNanos.get(t);
}
- return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
+ return !t.isAlive() || time!=null && nanoTime() < time.longValue();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/29fc006d/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
index 24ccda5..9cd85f8 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -43,6 +43,7 @@ import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Test;
public class SlotTest {
@@ -113,8 +114,7 @@ public class SlotTest {
@Test
public void testEmptyToEmpty() throws Exception {
- Time.startSimulatingAutoAdvanceOnSleep(1010);
- try {
+ try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
ILocalizer localizer = mock(ILocalizer.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -125,15 +125,12 @@ public class SlotTest {
DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
assertEquals(MachineState.EMPTY, nextState.state);
assertTrue(Time.currentTimeMillis() > 1000);
- } finally {
- Time.stopSimulating();
}
}
@Test
public void testLaunchContainerFromEmpty() throws Exception {
- Time.startSimulatingAutoAdvanceOnSleep(1010);
- try {
+ try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
int port = 8080;
String topoId = "NEW";
List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5);
@@ -210,16 +207,13 @@ public class SlotTest {
assertSame(newAssignment, nextState.currentAssignment);
assertSame(container, nextState.container);
assertTrue(Time.currentTimeMillis() > 2000);
- } finally {
- Time.stopSimulating();
}
}
@Test
public void testRelaunch() throws Exception {
- Time.startSimulatingAutoAdvanceOnSleep(1010);
- try {
+ try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
int port = 8080;
String topoId = "CURRENT";
List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5);
@@ -260,15 +254,12 @@ public class SlotTest {
nextState = Slot.stateMachineStep(nextState, staticState);
assertEquals(MachineState.RUNNING, nextState.state);
- } finally {
- Time.stopSimulating();
}
}
@Test
public void testReschedule() throws Exception {
- Time.startSimulatingAutoAdvanceOnSleep(1010);
- try {
+ try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
int port = 8080;
String cTopoId = "CURRENT";
List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5);
@@ -368,16 +359,13 @@ public class SlotTest {
assertSame(nAssignment, nextState.currentAssignment);
assertSame(nContainer, nextState.container);
assertTrue(Time.currentTimeMillis() > 4000);
- } finally {
- Time.stopSimulating();
}
}
@Test
public void testRunningToEmpty() throws Exception {
- Time.startSimulatingAutoAdvanceOnSleep(1010);
- try {
+ try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
int port = 8080;
String cTopoId = "CURRENT";
List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5);
@@ -432,15 +420,12 @@ public class SlotTest {
assertEquals(null, nextState.container);
assertEquals(null, nextState.currentAssignment);
assertTrue(Time.currentTimeMillis() > 3000);
- } finally {
- Time.stopSimulating();
}
}
@Test
public void testRunWithProfileActions() throws Exception {
- Time.startSimulatingAutoAdvanceOnSleep(1010);
- try {
+ try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
int port = 8080;
String cTopoId = "CURRENT";
List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5);
@@ -508,8 +493,6 @@ public class SlotTest {
assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions);
assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions);
assertTrue(Time.currentTimeMillis() > 5000);
- } finally {
- Time.stopSimulating();
}
}
}