You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/12/01 06:05:53 UTC
svn commit: r1208951 - in /incubator/lcf/branches/CONNECTORS-286/warthog/src:
main/java/org/apache/warthog/common/LongKey.java
test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java
Author: kwright
Date: Thu Dec 1 05:05:52 2011
New Revision: 1208951
URL: http://svn.apache.org/viewvc?rev=1208951&view=rev
Log:
Eliminate another source of contention: distinguish between read and write locks
Modified:
incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java
incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java
Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java?rev=1208951&r1=1208950&r2=1208951&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/common/LongKey.java Thu Dec 1 05:05:52 2011
@@ -25,24 +25,20 @@ import org.apache.warthog.interfaces.*;
*/
public class LongKey extends LongValue implements WHKey
{
- protected long hashCodeValue;
-
public LongKey(long value)
{
super(value);
- hashCodeValue = calculateHashCode(value);
}
public LongKey(byte[] bytes)
{
super(bytes);
- hashCodeValue = calculateHashCode(value);
}
/** Calculate the hash function. */
public long getHashCode()
{
- return hashCodeValue;
+ return calculateHashCode(value);
}
public static long calculateHashCode(long value)
Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java?rev=1208951&r1=1208950&r2=1208951&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java (original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/test/java/org/apache/warthog/tests/InMemAtomicNativeNonblockingKeyValueStore.java Thu Dec 1 05:05:52 2011
@@ -87,9 +87,12 @@ public class InMemAtomicNativeNonblockin
{
// First, get the identifier of the thread making the request
Long threadID = new Long(Thread.currentThread().getId());
- // Allocate a lock vector, to be filled in as we assert the locks
+ // Allocate a lock vector, to be filled in as we assert the read locks
LockKey[] locks = new LockKey[(int)checkValues.size()];
int lockPointer = 0;
+ // Allocate a write lock vector, same deal
+ LockKey[] writeLocks = new LockKey[(int)setValues.size()];
+ int writeLockPointer = 0;
try
{
WHKeyIterator iterator = checkValues.iterator();
@@ -97,11 +100,7 @@ public class InMemAtomicNativeNonblockin
{
WHKey key = iterator.next();
LockKey lockKey = new LockKey(key);
- if (setIfNonexisting(lockKey,threadID) == false)
- {
- //System.out.println("Type of key in lock contention = "+key.getClass().getName());
- throw new WHConcurrencyException();
- }
+ grabLock(lockKey,threadID);
locks[lockPointer++] = lockKey;
WHValue value = get(key);
WHValue otherValue = checkValues.get(key);
@@ -112,7 +111,18 @@ public class InMemAtomicNativeNonblockin
//System.out.println("Type of key in check contention = "+key.getClass().getName());
throw new WHConcurrencyException();
}
- // We got all the locks and passed the checks! Do the commit
+ // We got all the read locks and passed the checks! Get the write locks now. These represent
+ // intent to change...
+ iterator = setValues.iterator();
+ while (iterator.hasNext())
+ {
+ WHKey key = iterator.next();
+ LockKey writeLockKey = new LockKey(key);
+ grabWriteLock(writeLockKey,threadID);
+ writeLocks[writeLockPointer++] = writeLockKey;
+ }
+ // Finally, do the commit
+ int releaseLockPointer = 0;
iterator = setValues.iterator();
while (iterator.hasNext())
{
@@ -122,15 +132,26 @@ public class InMemAtomicNativeNonblockin
remove(key);
else
put(key,value);
+ // We can now release the write lock for this key
+ LockKey lockKey = writeLocks[releaseLockPointer];
+ releaseWriteLock(lockKey,threadID);
+ writeLocks[releaseLockPointer++] = null;
}
}
finally
{
+ // Undo write locks so far set
+ while (writeLockPointer > 0)
+ {
+ LockKey lockKey = writeLocks[--writeLockPointer];
+ if (lockKey != null)
+ releaseWriteLock(lockKey,threadID);
+ }
// Undo locks so far set
while (lockPointer > 0)
{
LockKey lockKey = locks[--lockPointer];
- remove(lockKey);
+ releaseLock(lockKey,threadID);
}
}
}
@@ -155,16 +176,58 @@ public class InMemAtomicNativeNonblockin
}
}
- protected boolean setIfNonexisting(Object key, Object value)
+ protected void grabLock(LockKey key, Long threadID)
+ throws WHConcurrencyException
{
Map database = databaseArray[key.hashCode() & databaseMask];
synchronized (database)
{
- if (database.containsKey(key))
- return false;
- database.put(key,value);
+ LockValue lockValue = (LockValue)database.get(key);
+ if (lockValue == null)
+ {
+ lockValue = new LockValue();
+ database.put(key,lockValue);
+ }
+ lockValue.addReadLock(threadID);
+ }
+ }
+
+ protected void grabWriteLock(LockKey key, Long threadID)
+ throws WHConcurrencyException
+ {
+ Map database = databaseArray[key.hashCode() & databaseMask];
+ synchronized (database)
+ {
+ LockValue lockValue = (LockValue)database.get(key);
+ if (lockValue == null)
+ {
+ lockValue = new LockValue();
+ database.put(key,lockValue);
+ }
+ lockValue.addWriteLock(threadID);
+ }
+ }
+
+ protected void releaseLock(LockKey key, Long threadID)
+ {
+ Map database = databaseArray[key.hashCode() & databaseMask];
+ synchronized (database)
+ {
+ LockValue lockValue = (LockValue)database.get(key);
+ if (lockValue.removeReadLock(threadID))
+ database.remove(key);
+ }
+ }
+
+ protected void releaseWriteLock(LockKey key, Long threadID)
+ {
+ Map database = databaseArray[key.hashCode() & databaseMask];
+ synchronized (database)
+ {
+ LockValue lockValue = (LockValue)database.get(key);
+ if (lockValue.removeWriteLock(threadID))
+ database.remove(key);
}
- return true;
}
// Lock class
@@ -191,4 +254,52 @@ public class InMemAtomicNativeNonblockin
}
}
+ // Lock value class
+
+ protected static class LockValue
+ {
+ protected boolean writeLock = false;
+ protected boolean promotedReadLock = false;
+ protected Set<Long> threadIDs = new HashSet<Long>();
+
+ public LockValue()
+ {
+ }
+
+ public void addReadLock(Long threadID)
+ throws WHConcurrencyException
+ {
+ if (writeLock)
+ throw new WHConcurrencyException();
+ threadIDs.add(threadID);
+ }
+
+ public boolean removeReadLock(Long threadID)
+ {
+ threadIDs.remove(threadID);
+ return threadIDs.size() == 0;
+ }
+
+ public void addWriteLock(Long threadID)
+ throws WHConcurrencyException
+ {
+ if (writeLock || threadIDs.size() > 1 || (threadIDs.size() == 1 && !threadIDs.contains(threadID)))
+ throw new WHConcurrencyException();
+ writeLock = true;
+ if (threadIDs.contains(threadID))
+ promotedReadLock = true;
+ else
+ threadIDs.add(threadID);
+ }
+
+ public boolean removeWriteLock(Long threadID)
+ {
+ writeLock = false;
+ if (promotedReadLock)
+ promotedReadLock = false;
+ else
+ threadIDs.remove(threadID);
+ return threadIDs.size() == 0;
+ }
+ }
}