You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2012/08/30 23:30:30 UTC
svn commit: r1379162 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/utils/ src/main/java/org/apache/giraph/zk/
src/test/java/org/apache/giraph/
Author: ereisman
Date: Thu Aug 30 21:30:29 2012
New Revision: 1379162
URL: http://svn.apache.org/viewvc?rev=1379162&view=rev
Log:
GIRAPH-291: PredicateLock should have a constructor to take in a custom waiting time and additional testing
Added:
giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java
giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java
giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1379162&r1=1379161&r2=1379162&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug 30 21:30:29 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-291: PredicateLock should have a constructor to take in a
+ custom waiting time and additional testing (aching via ereisman)
GIRAPH-318: New Iterator in LocalityInfoSorter is not working.
(Eli Reisman via apresta)
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java?rev=1379162&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java Thu Aug 30 21:30:29 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread-safe implementation of Time for testing that can help get time based
+ * ordering of events when desired.
+ */
+public class FakeTime implements Time {
+ /** Nanoseconds from the fake epoch */
+ private final AtomicLong nanosecondsSinceEpoch = new AtomicLong();
+
+ @Override
+ public long getMilliseconds() {
+ return nanosecondsSinceEpoch.get() / NS_PER_MS;
+ }
+
+ @Override
+ public long getNanoseconds() {
+ return nanosecondsSinceEpoch.get();
+ }
+
+ @Override
+ public int getSeconds() {
+ return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
+ }
+
+ @Override
+ public Date getCurrentDate() {
+ return new Date(getMilliseconds());
+ }
+
+ @Override
+ public void sleep(long milliseconds) throws InterruptedException {
+ nanosecondsSinceEpoch.getAndAdd(milliseconds * NS_PER_MS);
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java?rev=1379162&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java Thu Aug 30 21:30:29 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+
+/**
+ * Implementation of Time that is thread-safe and should be used in
+ * production.
+ */
+public class SystemTime implements Time {
+ /**
+ * Single instance of this object
+ */
+ private static final SystemTime SINGLE_TIME = new SystemTime();
+
+ @Override
+ public long getMilliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long getNanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public int getSeconds() {
+ return (int) (getMilliseconds() / MS_PER_SECOND);
+ }
+
+ @Override
+ public Date getCurrentDate() {
+ return new Date();
+ }
+
+ @Override
+ public void sleep(long milliseconds) throws InterruptedException {
+ Thread.sleep(milliseconds);
+ }
+
+ /**
+ * Get an instance (shared) of this object
+ *
+ * @return Instance of this object
+ */
+ public static Time getInstance() {
+ return SINGLE_TIME;
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java?rev=1379162&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java Thu Aug 30 21:30:29 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+
+/**
+ * Interface for handling Time related operations so that they can be mocked
+ * for testing.
+ */
+public interface Time {
+ /** Microseconds per millisecond */
+ long US_PER_MS = 1000;
+ /** Nanoseconds per microsecond */
+ long NS_PER_US = 1000;
+ /** Nanoseconds per millisecond */
+ long NS_PER_MS = US_PER_MS * NS_PER_US;
+ /** Milliseconds per second */
+ long MS_PER_SECOND = 1000;
+ /** Microseconds per second */
+ long US_PER_SECOND = US_PER_MS * MS_PER_SECOND;
+ /** Nanoseconds per second */
+ long NS_PER_SECOND = NS_PER_US * US_PER_SECOND;
+ /** Seconds per hour */
+ long SECONDS_PER_HOUR = 60 * 60;
+ /** Seconds per day */
+ long SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR;
+ /** Milliseconds per hour */
+ long MS_PER_HOUR = SECONDS_PER_HOUR * MS_PER_SECOND;
+ /** Milliseconds per day */
+ long MS_PER_DAY = SECONDS_PER_DAY * MS_PER_SECOND;
+
+ /**
+ *
+ * Get the current milliseconds
+ *
+ * @return The difference, measured in milliseconds, between
+ * the current time and midnight, January 1, 1970 UTC.
+ */
+ long getMilliseconds();
+
+ /**
+ * Get the current nanoseconds
+ *
+ * @return The difference, measured in nanoseconds, between
+ * the current time and midnight, January 1, 1970 UTC.
+ */
+ long getNanoseconds();
+
+ /**
+ * Get the current seconds
+ *
+ * @return The difference, measured in seconds, between
+ * the current time and midnight, January 1, 1970 UTC.
+ */
+ int getSeconds();
+
+ /**
+ * Get the current date
+ *
+ * @return Current date
+ */
+ Date getCurrentDate();
+
+ /**
+ * Current thread should sleep for some number of milliseconds
+ *
+ * @param milliseconds Milliseconds to sleep for
+ * @throws InterruptedException
+ */
+ void sleep(long milliseconds) throws InterruptedException;
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1379162&r1=1379161&r2=1379162&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Thu Aug 30 21:30:29 2012
@@ -23,6 +23,8 @@ import java.util.concurrent.locks.Condit
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -33,24 +35,41 @@ import org.apache.log4j.Logger;
public class PredicateLock implements BspEvent {
/** Class logger */
private static final Logger LOG = Logger.getLogger(PredicateLock.class);
- /** Msecs to refresh the progress meter */
- private static final int MSEC_PERIOD = 10000;
+ /** Default msecs to refresh the progress meter */
+ private static final int DEFAULT_MSEC_PERIOD = 10000;
/** Progressable for reporting progress (Job context) */
protected final Progressable progressable;
+ /** Actual mses to refresh the progress meter */
+ private final int msecPeriod;
/** Lock */
private Lock lock = new ReentrantLock();
/** Condition associated with lock */
private Condition cond = lock.newCondition();
/** Predicate */
private boolean eventOccurred = false;
+ /** Keeps track of the time */
+ private final Time time;
/**
- * Constructor.
+ * Constructor with default values.
*
* @param progressable used to report progress() (usually a Mapper.Context)
*/
public PredicateLock(Progressable progressable) {
+ this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.getInstance());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param progressable used to report progress() (usually a Mapper.Context)
+ * @param msecPeriod Msecs between progress reports
+ * @param time Time implementation
+ */
+ public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
this.progressable = progressable;
+ this.msecPeriod = msecPeriod;
+ this.time = time;
}
@Override
@@ -77,15 +96,15 @@ public class PredicateLock implements Bs
@Override
public boolean waitMsecs(int msecs) {
if (msecs < 0) {
- throw new RuntimeException("msecs cannot be negative!");
+ throw new RuntimeException("waitMsecs: msecs cannot be negative!");
}
- long maxMsecs = System.currentTimeMillis() + msecs;
+ long maxMsecs = time.getMilliseconds() + msecs;
int curMsecTimeout = 0;
lock.lock();
try {
while (!eventOccurred) {
curMsecTimeout =
- Math.min(msecs, MSEC_PERIOD);
+ Math.min(msecs, msecPeriod);
if (LOG.isDebugEnabled()) {
LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
}
@@ -102,7 +121,7 @@ public class PredicateLock implements Bs
"exception on cond.await() " +
curMsecTimeout, e);
}
- if (System.currentTimeMillis() > maxMsecs) {
+ if (time.getMilliseconds() > maxMsecs) {
return false;
}
msecs = Math.max(0, msecs - curMsecTimeout);
@@ -116,7 +135,7 @@ public class PredicateLock implements Bs
@Override
public void waitForever() {
- while (!waitMsecs(MSEC_PERIOD)) {
+ while (!waitMsecs(msecPeriod)) {
progressable.progress();
}
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java?rev=1379162&r1=1379161&r2=1379162&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java Thu Aug 30 21:30:29 2012
@@ -18,71 +18,101 @@
package org.apache.giraph;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.giraph.utils.Time;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.util.Progressable;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
/**
* Ensure that PredicateLock objects work correctly.
*/
public class TestPredicateLock {
- private static class SignalThread extends Thread {
- private final BspEvent event;
- public SignalThread(BspEvent event) {
- this.event = event;
- }
- public void run() {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- event.signal();
- }
+ /** How many times was progress called? */
+ private AtomicInteger progressCalled = new AtomicInteger(0);
+
+ private static class SignalThread extends Thread {
+ private final BspEvent event;
+ public SignalThread(BspEvent event) {
+ this.event = event;
+ }
+ public void run() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ event.signal();
}
+ }
- private Progressable stubContext;
+ private Progressable stubContext;
- private Progressable getStubProgressable() {
- if (stubContext == null)
- stubContext = new Progressable() {
- @Override
- public void progress() {
- System.out.println("progress received");
- }
- };
- return stubContext;
- }
+ private Progressable getStubProgressable() {
+ if (stubContext == null)
+ stubContext = new Progressable() {
+ @Override
+ public void progress() {
+ progressCalled.incrementAndGet();
+ }
+ };
+ return stubContext;
+ }
- /**
- * Make sure the the event is not signaled.
- */
+ @Before
+ public void setUp() {
+ progressCalled.set(0);
+ }
+
+ /**
+ * SMake sure the the event is not signaled.
+ */
@Test
- public void testWaitMsecsNoEvent() {
- BspEvent event = new PredicateLock(getStubProgressable());
- boolean gotPredicate = event.waitMsecs(50);
- assertFalse(gotPredicate);
- }
+ public void testWaitMsecsNoEvent() {
+ Time mockTime = mock(Time.class);
+ when(mockTime.getMilliseconds()).
+ thenReturn(0L).thenReturn(2L);
+ BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
+ boolean gotPredicate = event.waitMsecs(1);
+ assertFalse(gotPredicate);
+ assertEquals(0, progressCalled.get());
+ when(mockTime.getMilliseconds()).
+ thenReturn(0L).thenReturn(0L).thenReturn(2L);
+ gotPredicate = event.waitMsecs(1);
+ assertFalse(gotPredicate);
+ assertEquals(1, progressCalled.get());
+ }
- /**
- * Single threaded case
- */
+ /**
+ * Single threaded case where the event is signaled.
+ */
@Test
- public void testEvent() {
- BspEvent event = new PredicateLock(getStubProgressable());
- event.signal();
- boolean gotPredicate = event.waitMsecs(50);
- assertTrue(gotPredicate);
- event.reset();
- gotPredicate = event.waitMsecs(0);
- assertFalse(gotPredicate);
- }
+ public void testEvent() {
+ Time mockTime = mock(Time.class);
+ when(mockTime.getMilliseconds()).
+ thenReturn(0L).thenReturn(2L);
+ BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
+ event.signal();
+ boolean gotPredicate = event.waitMsecs(2);
+ assertTrue(gotPredicate);
+ event.reset();
+ when(mockTime.getMilliseconds()).
+ thenReturn(0L).thenReturn(2L);
+ gotPredicate = event.waitMsecs(0);
+ assertFalse(gotPredicate);
+ }
/**
- * Simple test for {@link PredicateLock#waitForever()}
+ * Thread signaled test for {@link PredicateLock#waitForever()}
*/
@Test
public void testWaitForever() {
@@ -97,23 +127,23 @@ public class TestPredicateLock {
assertTrue(event.waitMsecs(0));
}
- /**
- * Make sure the the event is signaled correctly
- * @throws InterruptedException
- */
+ /**
+ * Thread signaled test to make sure the the event is signaled correctly
+ *
+ * @throws InterruptedException
+ */
@Test
- public void testWaitMsecs() {
- System.out.println("testWaitMsecs:");
- BspEvent event = new PredicateLock(getStubProgressable());
- Thread signalThread = new SignalThread(event);
- signalThread.start();
- boolean gotPredicate = event.waitMsecs(2000);
- assertTrue(gotPredicate);
- try {
- signalThread.join();
- } catch (InterruptedException e) {
- }
- gotPredicate = event.waitMsecs(0);
- assertTrue(gotPredicate);
+ public void testWaitMsecs() {
+ BspEvent event = new PredicateLock(getStubProgressable());
+ Thread signalThread = new SignalThread(event);
+ signalThread.start();
+ boolean gotPredicate = event.waitMsecs(2000);
+ assertTrue(gotPredicate);
+ try {
+ signalThread.join();
+ } catch (InterruptedException e) {
}
+ gotPredicate = event.waitMsecs(0);
+ assertTrue(gotPredicate);
+ }
}