You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2012/08/30 23:30:30 UTC

svn commit: r1379162 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/utils/ src/main/java/org/apache/giraph/zk/ src/test/java/org/apache/giraph/

Author: ereisman
Date: Thu Aug 30 21:30:29 2012
New Revision: 1379162

URL: http://svn.apache.org/viewvc?rev=1379162&view=rev
Log:
GIRAPH-291: PredicateLock should have a constructor to take in a custom waiting time and additional testing

Added:
    giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
    giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1379162&r1=1379161&r2=1379162&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug 30 21:30:29 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-291: PredicateLock should have a constructor to take in a
+  custom waiting time and additional testing (aching via ereisman)
 
   GIRAPH-318: New Iterator in LocalityInfoSorter is not working.
   (Eli Reisman via apresta)

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java?rev=1379162&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/FakeTime.java Thu Aug 30 21:30:29 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread-safe implementation of Time for testing that can help get time based
+ * ordering of events when desired.
+ */
+public class FakeTime implements Time {
+  /** Nanoseconds from the fake epoch */
+  private final AtomicLong nanosecondsSinceEpoch = new AtomicLong();
+
+  @Override
+  public long getMilliseconds() {
+    return nanosecondsSinceEpoch.get() / NS_PER_MS;
+  }
+
+  @Override
+  public long getNanoseconds() {
+    return nanosecondsSinceEpoch.get();
+  }
+
+  @Override
+  public int getSeconds() {
+    return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
+  }
+
+  @Override
+  public Date getCurrentDate() {
+    return new Date(getMilliseconds());
+  }
+
+  @Override
+  public void sleep(long milliseconds) throws InterruptedException {
+    nanosecondsSinceEpoch.getAndAdd(milliseconds * NS_PER_MS);
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java?rev=1379162&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/SystemTime.java Thu Aug 30 21:30:29 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+
+/**
+ * Implementation of Time that is thread-safe and should be used in
+ * production.
+ */
+public class SystemTime implements Time {
+  /**
+   * Single instance of this object
+   */
+  private static final SystemTime SINGLE_TIME = new SystemTime();
+
+  @Override
+  public long getMilliseconds() {
+    return System.currentTimeMillis();
+  }
+
+  @Override
+  public long getNanoseconds() {
+    return System.nanoTime();
+  }
+
+  @Override
+  public int getSeconds() {
+    return (int) (getMilliseconds() / MS_PER_SECOND);
+  }
+
+  @Override
+  public Date getCurrentDate() {
+    return new Date();
+  }
+
+  @Override
+  public void sleep(long milliseconds) throws InterruptedException {
+    Thread.sleep(milliseconds);
+  }
+
+  /**
+   * Get an instance (shared) of this object
+   *
+   * @return Instance of this object
+   */
+  public static Time getInstance() {
+    return SINGLE_TIME;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java?rev=1379162&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/Time.java Thu Aug 30 21:30:29 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+
+/**
+ * Interface for handling Time related operations so that they can be mocked
+ * for testing.
+ */
+public interface Time {
+  /** Microseconds per millisecond */
+  long US_PER_MS = 1000;
+  /** Nanoseconds per microsecond */
+  long NS_PER_US = 1000;
+  /** Nanoseconds per millisecond */
+  long NS_PER_MS = US_PER_MS * NS_PER_US;
+  /** Milliseconds per second */
+  long MS_PER_SECOND = 1000;
+  /** Microseconds per second */
+  long US_PER_SECOND = US_PER_MS * MS_PER_SECOND;
+  /** Nanoseconds per second */
+  long NS_PER_SECOND = NS_PER_US * US_PER_SECOND;
+  /** Seconds per hour */
+  long SECONDS_PER_HOUR = 60 * 60;
+  /** Seconds per day */
+  long SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR;
+  /** Milliseconds per hour */
+  long MS_PER_HOUR = SECONDS_PER_HOUR * MS_PER_SECOND;
+  /** Milliseconds per day */
+  long MS_PER_DAY = SECONDS_PER_DAY * MS_PER_SECOND;
+
+  /**
+   *
+   * Get the current milliseconds
+   *
+   * @return The difference, measured in milliseconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  long getMilliseconds();
+
+  /**
+   * Get the current nanoseconds
+   *
+   * @return The difference, measured in nanoseconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  long getNanoseconds();
+
+  /**
+   * Get the current seconds
+   *
+   * @return The difference, measured in seconds, between
+   *         the current time and midnight, January 1, 1970 UTC.
+   */
+  int getSeconds();
+
+  /**
+   * Get the current date
+   *
+   * @return Current date
+   */
+  Date getCurrentDate();
+
+  /**
+   * Current thread should sleep for some number of milliseconds
+   *
+   * @param milliseconds Milliseconds to sleep for
+   * @throws InterruptedException
+   */
+  void sleep(long milliseconds) throws InterruptedException;
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1379162&r1=1379161&r2=1379162&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Thu Aug 30 21:30:29 2012
@@ -23,6 +23,8 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
@@ -33,24 +35,41 @@ import org.apache.log4j.Logger;
 public class PredicateLock implements BspEvent {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(PredicateLock.class);
-  /** Msecs to refresh the progress meter */
-  private static final int MSEC_PERIOD = 10000;
+  /** Default msecs to refresh the progress meter */
+  private static final int DEFAULT_MSEC_PERIOD = 10000;
   /** Progressable for reporting progress (Job context) */
   protected final Progressable progressable;
+  /** Actual mses to refresh the progress meter */
+  private final int msecPeriod;
   /** Lock */
   private Lock lock = new ReentrantLock();
   /** Condition associated with lock */
   private Condition cond = lock.newCondition();
   /** Predicate */
   private boolean eventOccurred = false;
+  /** Keeps track of the time */
+  private final Time time;
 
   /**
-   * Constructor.
+   * Constructor with default values.
    *
    * @param progressable used to report progress() (usually a Mapper.Context)
    */
   public PredicateLock(Progressable progressable) {
+    this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.getInstance());
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param progressable used to report progress() (usually a Mapper.Context)
+   * @param msecPeriod Msecs between progress reports
+   * @param time Time implementation
+   */
+  public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
     this.progressable = progressable;
+    this.msecPeriod = msecPeriod;
+    this.time = time;
   }
 
   @Override
@@ -77,15 +96,15 @@ public class PredicateLock implements Bs
   @Override
   public boolean waitMsecs(int msecs) {
     if (msecs < 0) {
-      throw new RuntimeException("msecs cannot be negative!");
+      throw new RuntimeException("waitMsecs: msecs cannot be negative!");
     }
-    long maxMsecs = System.currentTimeMillis() + msecs;
+    long maxMsecs = time.getMilliseconds() + msecs;
     int curMsecTimeout = 0;
     lock.lock();
     try {
       while (!eventOccurred) {
         curMsecTimeout =
-            Math.min(msecs, MSEC_PERIOD);
+            Math.min(msecs, msecPeriod);
         if (LOG.isDebugEnabled()) {
           LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
         }
@@ -102,7 +121,7 @@ public class PredicateLock implements Bs
             "exception on cond.await() " +
             curMsecTimeout, e);
         }
-        if (System.currentTimeMillis() > maxMsecs) {
+        if (time.getMilliseconds() > maxMsecs) {
           return false;
         }
         msecs = Math.max(0, msecs - curMsecTimeout);
@@ -116,7 +135,7 @@ public class PredicateLock implements Bs
 
   @Override
   public void waitForever() {
-    while (!waitMsecs(MSEC_PERIOD)) {
+    while (!waitMsecs(msecPeriod)) {
       progressable.progress();
     }
   }

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java?rev=1379162&r1=1379161&r2=1379162&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java Thu Aug 30 21:30:29 2012
@@ -18,71 +18,101 @@
 
 package org.apache.giraph;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.giraph.utils.Time;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.util.Progressable;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
 
 /**
  * Ensure that PredicateLock objects work correctly.
  */
 public class TestPredicateLock {
-    private static class SignalThread extends Thread {
-        private final BspEvent event;
-        public SignalThread(BspEvent event) {
-            this.event = event;
-        }
-        public void run() {
-            try {
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-            }
-            event.signal();
-        }
+  /** How many times was progress called? */
+  private AtomicInteger progressCalled = new AtomicInteger(0);
+
+  private static class SignalThread extends Thread {
+    private final BspEvent event;
+    public SignalThread(BspEvent event) {
+      this.event = event;
+    }
+    public void run() {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+      }
+      event.signal();
     }
+  }
 
-    private Progressable stubContext;
+  private Progressable stubContext;
 
-    private Progressable getStubProgressable() {
-        if (stubContext == null)
-            stubContext = new Progressable() {
-                @Override
-                    public void progress() {
-                        System.out.println("progress received");
-                    }
-            };
-        return stubContext;
-    }
+  private Progressable getStubProgressable() {
+    if (stubContext == null)
+      stubContext = new Progressable() {
+        @Override
+        public void progress() {
+          progressCalled.incrementAndGet();
+        }
+      };
+    return stubContext;
+  }
 
-    /**
-     * Make sure the the event is not signaled.
-     */
+  @Before
+  public void setUp() {
+    progressCalled.set(0);
+  }
+
+  /**
+   * SMake sure the the event is not signaled.
+   */
   @Test
-    public void testWaitMsecsNoEvent() {
-        BspEvent event = new PredicateLock(getStubProgressable());
-        boolean gotPredicate = event.waitMsecs(50);
-        assertFalse(gotPredicate);
-    }
+  public void testWaitMsecsNoEvent() {
+    Time mockTime = mock(Time.class);
+    when(mockTime.getMilliseconds()).
+        thenReturn(0L).thenReturn(2L);
+    BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
+    boolean gotPredicate = event.waitMsecs(1);
+    assertFalse(gotPredicate);
+    assertEquals(0, progressCalled.get());
+    when(mockTime.getMilliseconds()).
+        thenReturn(0L).thenReturn(0L).thenReturn(2L);
+    gotPredicate = event.waitMsecs(1);
+    assertFalse(gotPredicate);
+    assertEquals(1, progressCalled.get());
+  }
 
-    /**
-     * Single threaded case
-     */
+  /**
+   * Single threaded case where the event is signaled.
+   */
   @Test
-    public void testEvent() {
-        BspEvent event = new PredicateLock(getStubProgressable());
-        event.signal();
-        boolean gotPredicate = event.waitMsecs(50);
-        assertTrue(gotPredicate);
-        event.reset();
-        gotPredicate = event.waitMsecs(0);
-        assertFalse(gotPredicate);
-    }
+  public void testEvent() {
+    Time mockTime = mock(Time.class);
+    when(mockTime.getMilliseconds()).
+        thenReturn(0L).thenReturn(2L);
+    BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
+    event.signal();
+    boolean gotPredicate = event.waitMsecs(2);
+    assertTrue(gotPredicate);
+    event.reset();
+    when(mockTime.getMilliseconds()).
+        thenReturn(0L).thenReturn(2L);
+    gotPredicate = event.waitMsecs(0);
+    assertFalse(gotPredicate);
+  }
 
   /**
-   * Simple test for {@link PredicateLock#waitForever()}
+   * Thread signaled test for {@link PredicateLock#waitForever()}
    */
   @Test
   public void testWaitForever() {
@@ -97,23 +127,23 @@ public class TestPredicateLock {
     assertTrue(event.waitMsecs(0));
   }
 
-    /**
-     * Make sure the the event is signaled correctly
-     * @throws InterruptedException
-     */
+  /**
+   * Thread signaled test to make sure the the event is signaled correctly
+   *
+   * @throws InterruptedException
+   */
   @Test
-    public void testWaitMsecs() {
-        System.out.println("testWaitMsecs:");
-        BspEvent event = new PredicateLock(getStubProgressable());
-        Thread signalThread = new SignalThread(event);
-        signalThread.start();
-        boolean gotPredicate = event.waitMsecs(2000);
-        assertTrue(gotPredicate);
-        try {
-            signalThread.join();
-        } catch (InterruptedException e) {
-        }
-        gotPredicate = event.waitMsecs(0);
-        assertTrue(gotPredicate);
+  public void testWaitMsecs() {
+    BspEvent event = new PredicateLock(getStubProgressable());
+    Thread signalThread = new SignalThread(event);
+    signalThread.start();
+    boolean gotPredicate = event.waitMsecs(2000);
+    assertTrue(gotPredicate);
+    try {
+      signalThread.join();
+    } catch (InterruptedException e) {
     }
+    gotPredicate = event.waitMsecs(0);
+    assertTrue(gotPredicate);
+  }
 }