You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/12/01 06:05:53 UTC

svn commit: r1208951 - in /incubator/lcf/branches/CONNECTORS-286/warthog/src: main/java/org/apache/warthog/common/LongKey.java test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java

Author: kwright
Date: Thu Dec  1 05:05:52 2011
New Revision: 1208951

URL: http://svn.apache.org/viewvc?rev=1208951&view=rev
Log:
Eliminate another source of contention: distinguish between read and write locks

Modified:
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java
    incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java?rev=1208951&r1=1208950&r2=1208951&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java Thu Dec  1 05:05:52 2011
@@ -25,24 +25,20 @@ import org.apache.warthog.interfaces.*;
 */
 public class LongKey extends LongValue implements WHKey
 {
-  protected long hashCodeValue;
-  
   public LongKey(long value)
   {
     super(value);
-    hashCodeValue = calculateHashCode(value);
   }
   
   public LongKey(byte[] bytes)
   {
     super(bytes);
-    hashCodeValue = calculateHashCode(value);
   }
 
   /** Calculate the hash function. */
   public long getHashCode()
   {
-    return hashCodeValue;
+    return calculateHashCode(value);
   }
   
   public static long calculateHashCode(long value)

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java?rev=1208951&r1=1208950&r2=1208951&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java Thu Dec  1 05:05:52 2011
@@ -87,9 +87,12 @@ public class InMemAtomicNativeNonblockin
   {
     // First, get the identifier of the thread making the request
     Long threadID = new Long(Thread.currentThread().getId());
-    // Allocate a lock vector, to be filled in as we assert the locks
+    // Allocate a lock vector, to be filled in as we assert the read locks
     LockKey[] locks = new LockKey[(int)checkValues.size()];
     int lockPointer = 0;
+    // Allocate a write lock vector, same deal
+    LockKey[] writeLocks = new LockKey[(int)setValues.size()];
+    int writeLockPointer = 0;
     try
     {
       WHKeyIterator iterator = checkValues.iterator();
@@ -97,11 +100,7 @@ public class InMemAtomicNativeNonblockin
       {
         WHKey key = iterator.next();
         LockKey lockKey = new LockKey(key);
-        if (setIfNonexisting(lockKey,threadID) == false)
-        {
-          //System.out.println("Type of key in lock contention = "+key.getClass().getName());
-          throw new WHConcurrencyException();
-        }
+        grabLock(lockKey,threadID);
         locks[lockPointer++] = lockKey;
         WHValue value = get(key);
         WHValue otherValue = checkValues.get(key);
@@ -112,7 +111,18 @@ public class InMemAtomicNativeNonblockin
         //System.out.println("Type of key in check contention = "+key.getClass().getName());
         throw new WHConcurrencyException();
       }
-      // We got all the locks and passed the checks!  Do the commit
+      // We got all the read locks and passed the checks!  Get the write locks now.  These represent
+      // intent to change...
+      iterator = setValues.iterator();
+      while (iterator.hasNext())
+      {
+        WHKey key = iterator.next();
+        LockKey writeLockKey = new LockKey(key);
+        grabWriteLock(writeLockKey,threadID);
+        writeLocks[writeLockPointer++] = writeLockKey;
+      }
+      // Finally, do the commit
+      int releaseLockPointer = 0;
       iterator = setValues.iterator();
       while (iterator.hasNext())
       {
@@ -122,15 +132,26 @@ public class InMemAtomicNativeNonblockin
           remove(key);
         else
           put(key,value);
+        // We can now release the write lock for this key
+        LockKey lockKey = writeLocks[releaseLockPointer];
+        releaseWriteLock(lockKey,threadID);
+        writeLocks[releaseLockPointer++] = null;
       }
     }
     finally
     {
+      // Undo write locks so far set
+      while (writeLockPointer > 0)
+      {
+        LockKey lockKey = writeLocks[--writeLockPointer];
+        if (lockKey != null)
+          releaseWriteLock(lockKey,threadID);
+      }
       // Undo locks so far set
       while (lockPointer > 0)
       {
         LockKey lockKey = locks[--lockPointer];
-        remove(lockKey);
+        releaseLock(lockKey,threadID);
       }
     }
   }
@@ -155,16 +176,58 @@ public class InMemAtomicNativeNonblockin
     }
   }
   
-  protected boolean setIfNonexisting(Object key, Object value)
+  protected void grabLock(LockKey key, Long threadID)
+    throws WHConcurrencyException
   {
     Map database = databaseArray[key.hashCode() & databaseMask];
     synchronized (database)
     {
-      if (database.containsKey(key))
-        return false;
-      database.put(key,value);
+      LockValue lockValue = (LockValue)database.get(key);
+      if (lockValue == null)
+      {
+        lockValue = new LockValue();
+        database.put(key,lockValue);
+      }
+      lockValue.addReadLock(threadID);
+    }
+  }
+  
+  protected void grabWriteLock(LockKey key, Long threadID)
+    throws WHConcurrencyException
+  {
+    Map database = databaseArray[key.hashCode() & databaseMask];
+    synchronized (database)
+    {
+      LockValue lockValue = (LockValue)database.get(key);
+      if (lockValue == null)
+      {
+        lockValue = new LockValue();
+        database.put(key,lockValue);
+      }
+      lockValue.addWriteLock(threadID);
+    }
+  }
+  
+  protected void releaseLock(LockKey key, Long threadID)
+  {
+    Map database = databaseArray[key.hashCode() & databaseMask];
+    synchronized (database)
+    {
+      LockValue lockValue = (LockValue)database.get(key);
+      if (lockValue.removeReadLock(threadID))
+        database.remove(key);
+    }
+  }
+  
+  protected void releaseWriteLock(LockKey key, Long threadID)
+  {
+    Map database = databaseArray[key.hashCode() & databaseMask];
+    synchronized (database)
+    {
+      LockValue lockValue = (LockValue)database.get(key);
+      if (lockValue.removeWriteLock(threadID))
+        database.remove(key);
     }
-    return true;
   }
   
   // Lock class
@@ -191,4 +254,52 @@ public class InMemAtomicNativeNonblockin
     }
   }
   
+  // Lock value class
+  
+  protected static class LockValue
+  {
+    protected boolean writeLock = false;
+    protected boolean promotedReadLock = false;
+    protected Set<Long> threadIDs = new HashSet<Long>();
+    
+    public LockValue()
+    {
+    }
+    
+    public void addReadLock(Long threadID)
+      throws WHConcurrencyException
+    {
+      if (writeLock)
+        throw new WHConcurrencyException();
+      threadIDs.add(threadID);
+    }
+    
+    public boolean removeReadLock(Long threadID)
+    {
+      threadIDs.remove(threadID);
+      return threadIDs.size() == 0;
+    }
+    
+    public void addWriteLock(Long threadID)
+      throws WHConcurrencyException
+    {
+      if (writeLock || threadIDs.size() > 1 || (threadIDs.size() == 1 && !threadIDs.contains(threadID)))
+        throw new WHConcurrencyException();
+      writeLock = true;
+      if (threadIDs.contains(threadID))
+        promotedReadLock = true;
+      else
+        threadIDs.add(threadID);
+    }
+    
+    public boolean removeWriteLock(Long threadID)
+    {
+      writeLock = false;
+      if (promotedReadLock)
+        promotedReadLock = false;
+      else
+        threadIDs.remove(threadID);
+      return threadIDs.size() == 0;
+    }
+  }
 }