You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2014/04/30 22:07:44 UTC
svn commit: r1591466 - in /uima/uimaj/trunk/uimaj-core/src:
main/java/org/apache/uima/jcas/impl/JCasHashMap.java
test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java
test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
Author: schor
Date: Wed Apr 30 20:07:43 2014
New Revision: 1591466
URL: http://svn.apache.org/r1591466
Log:
[UIMA-3774] [UIMA-3784] fix size calc, rework design for less sync contention, better lock design, add test comparing to plain concurrenthashmap pseudo design (pseudo in that it doesn't really quite work)
Added:
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java
Modified:
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java?rev=1591466&r1=1591465&r2=1591466&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java Wed Apr 30 20:07:43 2014
@@ -20,10 +20,12 @@
package org.apache.uima.jcas.impl;
import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.uima.cas.impl.FeatureStructureImpl;
import org.apache.uima.jcas.cas.TOP;
+import org.apache.uima.jcas.cas.TOP_Type;
/**
* Version 2 (2014) of map between CAS addr and JCasCover Objects
@@ -34,7 +36,9 @@ import org.apache.uima.jcas.cas.TOP;
*
* Table always a power of 2 in size - permits faster hashing
*
- * Accesses to this table are not threadsafe.
+ * Accesses to this table are threadsafe, in order to support
+ * read-only CASes being shared by multiple threads.
+ * Multiple iterators in different threads could be accessing the map and updating it.
*
* Load factor tuning. 2,000,000 random inserts, 50 reps (for JIT)
* .5 (2x to 4x entries) 99% 5 probes 250 ms
@@ -45,43 +49,59 @@ import org.apache.uima.jcas.cas.TOP;
* version 1 at load factor .5 ran about 570 ms * 1.x
* (did 2 lookups for fetches if not found,)
*
- * Other changes:
- * remember finding empty slot on "miss" in get,
- * reuse on next put
- * change put to assume adding new item not already in table
- *
- * Multi-threading: For read-only CASes, multiple iterators in
- * different threads could be accessing the map and updating it.
+ * No "get" method, only getReserve. This method, if it doesn't
+ * find the key, eventually finds an empty (null) slot - it then
+ * stores a special "reserve" item with the same key value in that slot.
+ * Other threads doing getReserve calls, upon encountering this
+ * reserve item, wait until the reserve is converted to a
+ * real value (a notifyAll happens when this is done), and
+ * then the getReserve returns the real item.
+ *
+ * getReserve calls - when they find the item operate without any locking
+ *
+ * Locking:
+ * There is one lock used for reading and updating the table
+ * -- not used for reading when item found, only if item not found or is reserved
*
* Strategy: have 1 outer implementation delegating to multiple inner ones
* number = concurrency level (a power of 2)
*
* The hash uses some # of low order bits to address the right inner one.
*
- * This table is used to hold JCas cover classes for CAS feature structures. There is one instance associated with each CAS that is using it.
+ * This table is used to hold JCas cover classes for CAS feature structures.
+ * There is one instance of this table associated with each CAS that is using it.
* <p>
* The update occurs in the code in JCasGenerated classes, which do:
* a call to get the value of the map for a key
* if that is "null", it creates the new JCas cover object, and does a "put" to add the value.
* <p>
- * The creation of the new JCas cover object can, in turn, run arbitrary user code, which can result in updates to the JCasHashMap which occur before this original update occurs.
+ * The creation of the new JCas cover object can, in turn, run arbitrary user code,
+ * which can result in updates to the JCasHashMap which occur before this original update occurs.
* <p>
- * In a multi-threaded environment, multiple threads can do a "get" for the same Feature Structure instance. If it's not in the Map, the correct behavior is:
+ * In a multi-threaded environment, multiple threads can do a "get"
+ * for the same Feature Structure instance. If it's not in the Map, the correct behavior is:
* <p>
- * one of the threads needs to add it
- * the other threads need to wait for the one thread to finish adding, and then return the object that the one thread added.
+ * one of the threads adds the new element
+ * the other threads wait for the one thread to finish adding, and then return the object that the one thread added.
* <p>
* The implementation works as follows:
* <p>
- * 1) The JCasHashMap is split into "n" sub-maps. The number is the number of cores, but grows more slowly as the # of cores > 16. :
- * getReserve and put and clear are synchronized on the sub-maps, using ordinary synchronized keywords; the outer get/put are not synchronized
- * 1a) The outer size (aggregated from the sub sizes) is kept as an atomic integer, updated only on puts and clear
- * 2) The number of sub maps is rounded to a power of 2, to allow the low order bits of the hash of the key to be used to pick the map (via masking).
+ * 1) The JCasHashMap is split into "n" sub-maps.
+ * The number is the number of cores, but grows more slowly as the # of cores > 16.
+ * This number can be specified, but this is not currently exposed in the tuning parameters
+ * Locking occurs on the sub-maps; the outer method calls are not synchronized
+ * 2) The number of sub maps is rounded to a power of 2, to allow the low order bits of the hash of the key
+ * to be used to pick the map (via masking).
* 3) A getReserve that results in not-found returns a null, but adds to the table a special reserved element.
- * 4) A get that finds a special reserved element knows that some other thread is in the process of adding an entry for that key, so it waits.
- * 5) A put, if it finds a reserved-for-that-key element, replaces that with the real element, and then does a notifyAll to wake up any threads that were waiting (on this sub-map), and these threads then re-do the get
+ * 3a) This adding may result in table resizing
+ * 4) A getReserve that finds a special reserved element, knows that some other thread
+ * is in the process of adding an entry for that key, so it waits.
+ * 5) A put, if it finds a reserved-for-that-key element, replaces that with the real element,
+ * and then does a notifyAll to wake up any threads that were waiting (on this sub-map),
+ * and these threads then re-do the get. Multiple threads could be waiting on this, and they will all
+ * wake-up.
* <p>
- * All calls are of the getReserved, followed by a put if the getReserved returns null. I removed the plain "get".
+ * All calls are of the getReserved, followed by a put if the getReserved returns null.
*
*/
public class JCasHashMap {
@@ -91,6 +111,10 @@ public class JCasHashMap {
private static final boolean TUNE = false;
private static final int DEFAULT_CONCURRENCY_LEVEL;
+
+ private static final int PROBE_ADDR_INDEX = 0;
+ private static final int PROBE_DELTA_INDEX = 1;
+
static {
int cores = Runtime.getRuntime().availableProcessors();
DEFAULT_CONCURRENCY_LEVEL = (cores < 17) ? cores :
@@ -98,12 +122,35 @@ public class JCasHashMap {
24 + (cores - 24) / 4;
}
- //These are for tuning measurements
- private int histogram [];
- private int maxProbe = 0;
-
+ private static class ReserveTopType extends TOP_Type {
+ public ReserveTopType() {
+ super();
+ }
+ }
+
+ // public for test case use
+ public static final TOP_Type RESERVE_TOP_TYPE_INSTANCE = new ReserveTopType();
+ private static boolean isReserve(FeatureStructureImpl m) {
+ return m != null && ((TOP)m).jcasType == RESERVE_TOP_TYPE_INSTANCE;
+ }
+ private static boolean isReal(FeatureStructureImpl m) {
+ return m != null && ((TOP)m).jcasType != RESERVE_TOP_TYPE_INSTANCE;
+ }
+
+ private static int[] resetProbeInfo(int[] probeInfo) {
+ probeInfo[PROBE_ADDR_INDEX] = -1;
+ probeInfo[PROBE_DELTA_INDEX] = 1;
+ return probeInfo;
+ }
+
+ private static int[] setProbeInfo(int[] probeInfo, int probeAddr, int probeDelta) {
+ probeInfo[PROBE_ADDR_INDEX] = probeAddr;
+ probeInfo[PROBE_DELTA_INDEX] = probeDelta;
+ return probeInfo;
+ }
+
private final float loadFactor = (float)0.60;
-
+
private final int initialCapacity;
private final boolean useCache;
@@ -116,257 +163,288 @@ public class JCasHashMap {
private final SubMap[] subMaps;
- private final AtomicInteger aggregate_size = new AtomicInteger(0);
-
private final int subMapInitialCapacity;
private class SubMap {
+
+ //These are for tuning measurements
+ private int histogram [];
+ private int maxProbe = 0;
+ private int maxProbeAfterContinue = 0;
+ private int continues = 0;;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition lockCondition = lock.newCondition();
+
private int sizeWhichTriggersExpansion;
private int size; // number of elements in the table
- private FeatureStructureImpl [] table;
+ private volatile FeatureStructureImpl [] table;
private boolean secondTimeShrinkable = false;
- private int bitsMask; // 1's to "and" with result to keep in range
-
- // for testing only:
- int getbitsMask() {return bitsMask;}
private SubMap newTable(int capacity) {
- assert(Integer.bitCount(capacity) == 1);
- table = new FeatureStructureImpl[capacity];
- bitsMask = capacity - 1;
+ table = newTableKeepSize(capacity);
size = 0;
- sizeWhichTriggersExpansion = (int)(capacity * loadFactor);
+ if (TUNE) {
+ histogram = new int[200];
+ Arrays.fill(histogram, 0);
+ }
return this;
}
- private synchronized void clear() {
- // see if size is less than the 1/2 size that triggers expansion
- if (size < (sizeWhichTriggersExpansion >>> 1)) {
- // if 2nd time then shrink by 50%
- // this is done to avoid thrashing around the threshold
- if (secondTimeShrinkable) {
- secondTimeShrinkable = false;
- final int newCapacity = Math.max(subMapInitialCapacity, table.length >>> 1);
- if (newCapacity < table.length) {
- newTable(newCapacity); // shrink table by 50%
- } else { // don't shrink below minimum
- Arrays.fill(table, null);
+ private FeatureStructureImpl[] newTableKeepSize(int capacity) {
+ assert(Integer.bitCount(capacity) == 1);
+ FeatureStructureImpl[] t = new FeatureStructureImpl[capacity];
+ sizeWhichTriggersExpansion = (int)(capacity * loadFactor);
+ return t;
+ }
+
+ private void incrementSize() {
+ assert(lock.getHoldCount() > 0);
+ if (size >= sizeWhichTriggersExpansion) {
+ increaseTableCapacity();
+ }
+ size++;
+ }
+
+ // Does size management - shrinking overly large tables after the 2nd time
+ private void clear() {
+ lock.lock();
+ try {
+ // see if size is less than the 1/2 size that triggers expansion
+ if (size < (sizeWhichTriggersExpansion >>> 1)) {
+ // if 2nd time then shrink by 50%
+ // this is done to avoid thrashing around the threshold
+ if (secondTimeShrinkable) {
+ secondTimeShrinkable = false;
+ final int newCapacity = Math.max(subMapInitialCapacity, table.length >>> 1);
+ if (newCapacity < table.length) {
+ newTable(newCapacity); // shrink table by 50%
+ } else { // don't shrink below minimum
+ Arrays.fill(table, null);
+ }
+ size = 0;
+ return;
+ } else {
+ secondTimeShrinkable = true;
}
- size = 0;
- return;
} else {
- secondTimeShrinkable = true;
+ secondTimeShrinkable = false; // reset this to require 2 triggers in a row
}
- } else {
- secondTimeShrinkable = false; // reset this to require 2 triggers in a row
+ size = 0;
+ Arrays.fill(table, null);
+ } finally {
+ lock.unlock();
}
- size = 0;
- Arrays.fill(table, null);
}
-
-// private synchronized FeatureStructureImpl get(int key, int hash) {
-// int nbrProbes = 1;
-// int probeAddr = hash & bitsMask;
-// int probeDelta = 1;
-// FeatureStructureImpl maybe = table[probeAddr];
-// while ((null != maybe) && (maybe.getAddress() != key)) {
-// if (TUNE) {
-// nbrProbes++;
-// }
-// probeAddr = bitsMask & (probeAddr + (probeDelta++));
-// maybe = table[probeAddr];
-// }
-//
-// if (TUNE) {
-// histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-// maxProbe = Math.max(maxProbe, nbrProbes);
-// }
-// return maybe;
-// }
-
+
/**
- * Gets a value, but if the value isn't there, it reserves the slot where it will go
- * with a new instance where the key matches, but the type is null
- * @param key - the addr in the heap
- * @param hash - the hash that was already computed from the key
- * @return - the found fs, or null
+ * Can be called under lock or not.
+ * @param key -
+ * @param hash -
+ * @param probeInfo - used to get/receive multiple int values;
+ * 0: (in/out) startProbe or -1,
+ * 1: (in/out) probeDelta (starts at 1)
+ * @return the probeAddr in original table (which might have been resized)
*/
- private synchronized FeatureStructureImpl getReserve(final int key, final int hash) {
+ private FeatureStructureImpl find(final int key, final int hash, final int[] probeInfo) {
int nbrProbes = 1;
- int probeAddr;
- FeatureStructureImpl maybe;
- retryAfterWait: do {
- probeAddr = hash & bitsMask;
- int probeDelta = 1;
- maybe = table[probeAddr];
- while (null != maybe) {
- if (maybe.getAddress() == key) {
- while (((TOP) maybe).jcasType == null) {
- // we hit a reserve marker - there is another thread in the process of creating an instance of this,
- // so wait for it to finish and then return it
- final int sizeNow = size;
- try {
- wait(); // releases the synchronized monitor, otherwise this segment blocked for others while waiting
- } catch (InterruptedException e) {
+ final boolean isContinue = TUNE && (probeInfo[PROBE_ADDR_INDEX] != -1);
+ FeatureStructureImpl[] localTable = table;
+ final int bitMask = localTable.length - 1;
+ final int startProbe = probeInfo[PROBE_ADDR_INDEX];
+ int probeAddr = (startProbe < 0) ? (hash & bitMask) : startProbe;
+ assert((startProbe < 0) ? probeInfo[PROBE_DELTA_INDEX] == 1 : true);
+ int probeDelta = probeInfo[PROBE_DELTA_INDEX];
+ FeatureStructureImpl m = localTable[probeAddr];
+
+ while (null != m) { // loop to traverse bucket chain
+ if (m.getAddress() == key) {
+ setProbeInfo(probeInfo, probeAddr, probeDelta);
+ if (TUNE) {
+ final boolean needUnlock;
+ if (!lock.isHeldByCurrentThread()) {
+ lock.lock();
+ needUnlock = true;
+ } else {
+ needUnlock = false;
+ }
+ try {
+ histogram[nbrProbes] += 1;
+ if (maxProbe < nbrProbes) {
+ maxProbe = nbrProbes;
}
- if (size != sizeNow) {
- // at this point, the table may have grown
- // so start over
- continue retryAfterWait;
+ if (isContinue) {
+ if (maxProbeAfterContinue < nbrProbes) {
+ maxProbeAfterContinue = nbrProbes;
+ }
+ continues ++;
+ }
+ } finally {
+ if (needUnlock) {
+ lock.unlock();
}
}
- if (TUNE) {
- histogram[Math.min(histogram.length - 1, nbrProbes)]++;
- maxProbe = Math.max(maxProbe, nbrProbes);
- }
- return maybe;
}
- // is not null, but is wrong key
- if (TUNE) {
- nbrProbes++;
- }
- probeAddr = bitsMask & (probeAddr + (probeDelta++));
- maybe = table[probeAddr];
+ return m;
}
- break;
- } while (true); // must be true to have label of continue used
- // "maybe" is null
- // reserve this slot to prevent other "getReserved" calls for this same instance from succeeding,
- // causing them to wait until this slot gets filled in with a FS value
- table[probeAddr] = new TOP(key, null); // null indicates its a RESERVE marker
-
- if (TUNE) {
- histogram[Math.min(histogram.length - 1, nbrProbes)]++;
- maxProbe = Math.max(maxProbe, nbrProbes);
+ nbrProbes++;
+ probeAddr = bitMask & (probeAddr + (probeDelta++));
+ m = localTable[probeAddr];
}
- return maybe;
+ setProbeInfo(probeInfo, probeAddr, probeDelta);
+ return m;
}
-
- private synchronized void put(int key, FeatureStructureImpl value, int hash) {
- if (size >= sizeWhichTriggersExpansion) {
- increaseTableCapacity();
- }
- size++;
-
- int probeAddr = hash & bitsMask;
- int probeDelta = 1;
- int nbrProbes = 1;
- FeatureStructureImpl m = table[probeAddr];
- while (null != m) {
- if (((TOP)m).jcasType == null) {
- // this slot was previously reserved - check to see if the key matches
- // (must be same key, otherwise, impl could deadlock)
- if (m.getAddress() == key) {
- // found the previously reserved slot
- table[probeAddr] = value;
- aggregate_size.incrementAndGet();
- notifyAll();
- if (TUNE) {
- histogram[Math.min(histogram.length - 1, nbrProbes)]++;
- maxProbe = Math.max(maxProbe, nbrProbes);
- }
- return;
- }
- }
- // skip if adding the same element to the table
- // probably never happens, though
- if (m.getAddress() == key) {
- if (TUNE) {
- System.err.format("JCasHashMap found already existing cover instance for key %,d, ignoring put%n", key);
- throw new RuntimeException(); //to get stack trace
+ /**
+ * Gets a value, but if the value isn't there, it reserves the slot where it will go
+ * with a new instance where the key matches, but the type is a unique value.
+ *
+ * Threading: not synchronized for main path where get is finding an element.
+ * Since elements are never updated, there is no race if an element is found.
+ * And it doesn't matter if the table is resized (if the element is found).
+ * If it is not found, or a reserve is found, need to get the lock, and
+ * start over if resized, or
+ * continue from reserved or null spot if not
+ *
+ * @param key - the addr in the heap
+ * @param hash - the hash that was already computed from the key
+ * @return - the found fs, or null
+ */
+ private FeatureStructureImpl getReserve(final int key, final int hash) {
+
+ boolean isLocked = false;
+ int[] probeInfo = resetProbeInfo(new int[2]);
+ try {
+
+ retry:
+ while (true) { // loop back point after locking against updates, to re-traverse the bucket chain from the beginning
+ FeatureStructureImpl m;
+ final FeatureStructureImpl[] localTable = table;
+
+ if (isReal(m = find(key, hash, probeInfo))) {
+ return m; // fast path for found item
+ }
+
+ // is reserve or null. Redo or continue search under lock
+ // need to do this for reserve-case because otherwise, could
+ // wait, but notify could come before wait - hence, wait forever
+ lock.lock();
+ isLocked = true;
+ /*****************
+ * LOCKED *
+ *****************/
+ if (localTable != table) { // redo search from top, because table resized
+ resetProbeInfo(probeInfo);
}
+ // note: localTable not used from this point, unless reset
+
+ if (isReal(m = find(key, hash, probeInfo))) {
+ return m; // fast path for found item
+ }
+
+ while (isReserve(m)) {
+ final FeatureStructureImpl[] localTable2 = table; // to see if table gets resized
+ // can't wait on reserved item because would need to do lock.unlock() followed by wait, but
+ // inbetween these, another thread could already do the notify.
+ try {
+ lockCondition.await(); // wait on object that needs to be unlocked
+ } catch (InterruptedException e) {
+ }
+
+ if (localTable2 != table) { // table was resized
+ resetProbeInfo(probeInfo);
+ continue retry;
+ }
+ m = localTable2[probeInfo[PROBE_ADDR_INDEX]];
+ if (isReserve(m)) {
+ continue; // case = interruptedexception && no resize && not changed to real, retry
+ }
+ return m; // return real item
+ }
+
+ // is null. Reserve this slot to prevent other "getReserved" calls for this same instance from succeeding,
+ // causing them to wait until this slot gets filled in with a FS value
+ // Use table, not localTable, because resize may have occurred
+ table[probeInfo[PROBE_ADDR_INDEX]] = new TOP(key, RESERVE_TOP_TYPE_INSTANCE);
+ incrementSize();
+ return null;
}
- if (TUNE) {
- nbrProbes++;
+ } finally {
+ if (isLocked) {
+ lock.unlock();
}
- probeAddr = bitsMask & (probeAddr + (probeDelta++));
- m = table[probeAddr];
}
- if (TUNE) {
- histogram[Math.min(histogram.length - 1, nbrProbes)]++;
- maxProbe = Math.max(maxProbe, nbrProbes);
+ }
+
+
+ private FeatureStructureImpl put(int key, FeatureStructureImpl value, int hash) {
+
+ lock.lock();
+ try {
+ final int[] probeInfo = resetProbeInfo(new int[2]);
+ FeatureStructureImpl m = find(key, hash, probeInfo);
+ table[probeInfo[PROBE_ADDR_INDEX]] = value;
+ if (isReserve(m)) {
+ lockCondition.signalAll();
+ // dont update size - was updated when reserve was added
+ return null;
+ } else if (m == null) {
+ incrementSize(); // otherwise, replacing an existing item, don't update size
+ }
+ return m;
+ } finally {
+ lock.unlock();
}
- table[probeAddr] = value;
- aggregate_size.incrementAndGet();
}
-
-// private synchronized FeatureStructureImpl putIfAbsent(
-// int key,
-// Callable<FeatureStructureImpl> valueProducer,
-// int hash) throws Exception {
-// int nbrProbes = 1;
-// int probeAddr = hash & bitsMask;
-// int probeDelta = 1;
-// FeatureStructureImpl maybe = table[probeAddr];
-// while ((null != maybe) && (maybe.getAddress() != key)) {
-// if (TUNE) {
-// nbrProbes++;
-// }
-// probeAddr = bitsMask & (probeAddr + (probeDelta++));
-// maybe = table[probeAddr];
-// }
-//
-// if (TUNE) {
-// histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-// maxProbe = Math.max(maxProbe, nbrProbes);
-// }
-//
-// if (null == maybe) {
-// table[probeAddr] = maybe = valueProducer.call();
-// aggregate_size.incrementAndGet();
-// }
-// return maybe;
-// }
-
-// private int findEmptySlot(int key, int hash) {
-// int probeAddr = hash & bitsMask;
-// int probeDelta = 1;
-// int nbrProbes = 1;
-// FeatureStructureImpl m = table[probeAddr];
-// while (null != m) {
-// if (((TOP)m).jcasType == null) {
-// // this slot was previously reserved - check to see if the key matches
-// // (must be same key, otherwise, impl could deadlock)
-// if (m.getAddress() == key) {
-//
-// }
-// if (TUNE) {
-// nbrProbes++;
-// }
-// probeAddr = bitsMask & (probeAddr + (probeDelta++));
-// }
-// if (TUNE) {
-// histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-// maxProbe = Math.max(maxProbe, nbrProbes);
-// }
-// return probeAddr;
-// }
+
+
+ /**
+ * Only used to fill in newly expanded table
+ * @param key -
+ * @param value -
+ * @param hash -
+ */
+
+ private void putInner(int key, FeatureStructureImpl value, int hash) {
+ assert(lock.getHoldCount() > 0);
+ final int[] probeInfo = resetProbeInfo(new int[2]);
+
+ FeatureStructureImpl m = find(key, hash, probeInfo);
+ assert(m == null); // no dups in original table imply no hits in new one
+ table[probeInfo[PROBE_ADDR_INDEX]] = value;
+ }
+
+ // called under lock
private void increaseTableCapacity() {
final FeatureStructureImpl [] oldTable = table;
final int oldCapacity = oldTable.length;
-
int newCapacity = 2 * oldCapacity;
if (TUNE) {
- System.out.println("Size increasing from " + oldCapacity + " to " + newCapacity);
+ System.out.println("Capacity increasing from " + oldCapacity + " to " + newCapacity);
}
- newTable(newCapacity);
- size = 0;
+ table = newTableKeepSize(newCapacity);
for (int i = 0; i < oldCapacity; i++) {
FeatureStructureImpl fs = oldTable[i];
if (fs != null) {
- int key = fs.getAddress();
- int hash = hashInt(key);
- put(key, fs, hash >>> concurrencyLevelBits);
+ final int key = fs.getAddress();
+ final int hash = hashInt(key);
+ putInner(key, fs, hash >>> concurrencyLevelBits);
}
}
}
}
JCasHashMap(int capacity, boolean doUseCache) {
- this(capacity, doUseCache, DEFAULT_CONCURRENCY_LEVEL);
+ // reduce concurrency so that capacity / concurrency >= 32
+ // that is, minimum sub-table capacity is 32 entries
+ // if capacity/concurrency < 32,
+ // concurrency = capacity / 32
+ this(capacity, doUseCache,
+ ((capacity / DEFAULT_CONCURRENCY_LEVEL) < 32) ?
+ Math.max(1, capacity / 32) :
+ DEFAULT_CONCURRENCY_LEVEL);
}
JCasHashMap(int capacity, boolean doUseCache, int aConcurrencyLevel) {
@@ -390,10 +468,6 @@ public class JCasHashMap {
for (int i = 0; i < concurrencyLevel; i++) {
subMaps[i] = (new SubMap()).newTable(subMapInitialCapacity);
}
- if (TUNE) {
- histogram = new int[200];
- Arrays.fill(histogram, 0);
- }
}
// cleared when cas reset
@@ -407,61 +481,29 @@ public class JCasHashMap {
for (SubMap m : subMaps) {
m.clear();
}
- aggregate_size.set(0);
}
-
-// public FeatureStructureImpl get(int key) {
-// if (!this.useCache) {
-// return null;
-// }
-// int hash = hashInt(key);
-// int subMapIndex = hash & concurrencyBitmask;
-//
-// SubMap m = subMaps[subMapIndex];
-// return m.get(key, hash >>> concurrencyLevelBits);
-// }
+
+ private SubMap getSubMap(int hash) {
+ return subMaps[hash & concurrencyBitmask];
+ }
public FeatureStructureImpl getReserve(int key) {
if (!this.useCache) {
return null;
}
- int hash = hashInt(key);
- int subMapIndex = hash & concurrencyBitmask;
-
- SubMap m = subMaps[subMapIndex];
- return m.getReserve(key, hash >>> concurrencyLevelBits);
+ final int hash = hashInt(key);
+ return getSubMap(hash).getReserve(key, hash >>> concurrencyLevelBits);
}
-
-// public FeatureStructureImpl putIfAbsent(int key, Callable<FeatureStructureImpl> valueProducer) throws Exception {
-// if (!this.useCache) {
-// return valueProducer.call();
-// }
-// int hash = hashInt(key);
-// int subMapIndex = hash & concurrencyBitmask;
-//
-// SubMap m = subMaps[subMapIndex];
-// return m.putIfAbsent(key, valueProducer, hash >>> concurrencyLevelBits);
-// }
-//
-
-
- public void put(FeatureStructureImpl value) {
+ public FeatureStructureImpl put(FeatureStructureImpl value) {
if (!this.useCache) {
- return;
+ return null;
}
- int key = value.getAddress();
- int hash = hashInt(key);
- int subMapIndex = hash & concurrencyBitmask;
-
- SubMap m = subMaps[subMapIndex];
- m.put(key, value, hash >>> concurrencyLevelBits);
+ final int key = value.getAddress();
+ final int hash = hashInt(key);
+ return getSubMap(hash).put(key, value, hash >>> concurrencyLevelBits);
}
-
- public int size() {
- return aggregate_size.get();
- }
-
+
// The hash function is derived from murmurhash3 32 bit, which
// carries this statement:
@@ -491,31 +533,57 @@ public class JCasHashMap {
return h1;
}
+ //test case use
int[] getCapacities() {
int[] r = new int[subMaps.length];
int i = 0;
for (SubMap subMap : subMaps) {
- r[i++] = subMap.bitsMask + 1;
+ r[i++] = subMap.table.length;
}
return r;
}
+ //test case use
+ int getApproximateSize() {
+ int s = 0;
+ for (SubMap subMap : subMaps) {
+ synchronized (subMap) {
+ s += subMap.size;
+ }
+ }
+ return s;
+ }
+
public void showHistogram() {
if (TUNE) {
- System.out.println("Histogram of number of probes, factor = " + loadFactor + ", max = "
- + maxProbe);
- for (int i = 0; i <= maxProbe; i++) {
- System.out.println(i + ": " + histogram[i]);
- }
+ int sm = -1;
int agg_tableLength = 0;
for (SubMap m : subMaps) {
+ sm++;
+ int sumI = 0;
+
+ for (int i : m.histogram) {
+ sumI += i;
+ }
+
+ System.out.format(
+ "Histogram %d of number of probes, loadfactor = %.1f, maxProbe=%,d afterContinue=%,d nbr regs=%,d nbrContinues=%,d%n",
+ sm, loadFactor, m.maxProbe, m.maxProbeAfterContinue, sumI, m.continues);
+ for (int i = 0; i <= m.maxProbe; i++) {
+ System.out.println(i + ": " + m.histogram[i]);
+ }
agg_tableLength += m.table.length;
}
- System.out.println("bytes / entry = " + (float) (agg_tableLength) * 4 / size());
+
+ System.out.println("bytes / entry = " + (float) (agg_tableLength) * 4 / getApproximateSize());
System.out.format("size = %,d, prevExpansionTriggerSize = %,d, next = %,d%n",
- size(),
+ getApproximateSize(),
(int) ((agg_tableLength >>> 1) * loadFactor),
(int) (agg_tableLength * loadFactor));
}
}
+
+ public int getConcurrencyLevel() {
+ return concurrencyLevel;
+ }
}
Added: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java?rev=1591466&view=auto
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java (added)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java Wed Apr 30 20:07:43 2014
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.jcas.impl;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.uima.cas.impl.FeatureStructureImpl;
+import org.apache.uima.internal.util.MultiThreadUtils;
+import org.apache.uima.jcas.cas.TOP;
+import org.apache.uima.jcas.cas.TOP_Type;
+
+import junit.framework.TestCase;
+
+/**
+ * Run this as a single test with yourkit, and look at the retained storage for both maps.
+ *
+ * Java 8 test: Concurrent Hash Map impl showed ~2.25 MB * 8 (concurrency level)
+ * JCasHashmap showed ~0.835 MB * 8
+ *
+ */
+public class JCasHashMapCompareTest extends TestCase {
+
+ private static class FakeTopType extends TOP_Type {
+ public FakeTopType() {
+ super();
+ }
+ }
+
+ private static final long rm = 0x5deece66dL;
+
+ private static int sizeOfTest = 1024 * 8;
+// private static final int SIZEm1 = SIZE - 1;
+ private static final TOP_Type FAKE_TOP_TYPE_INSTANCE = new FakeTopType();
+ private JCasHashMap jhm;
+ private ConcurrentMap<Integer, FeatureStructureImpl> chm;
+
+
+ public void testComp() throws Exception {
+ Thread.sleep(0000);
+ int numberOfThreads = MultiThreadUtils.PROCESSORS;
+ System.out.format("test JCasHashMapComp with %d threads%n", numberOfThreads);
+ runCustom(numberOfThreads);
+ runConCur(numberOfThreads);
+ runCustom(numberOfThreads);
+ runConCur(numberOfThreads);
+ runCustom(numberOfThreads);
+ runConCur(numberOfThreads);
+// stats("custom", runCustom(numberOfThreads)); // not accurate, use yourkit retained size instead
+// stats("concur", runConCur(numberOfThreads));
+ Set<Integer> ints = new HashSet<Integer>();
+ int i = 0;
+ for (Entry<Integer, FeatureStructureImpl> e : chm.entrySet()) {
+ assertFalse(ints.contains(Integer.valueOf(e.getKey())));
+ assertEquals(e.getValue().getAddress(), (int)(e.getKey()));
+ ints.add(e.getKey());
+ i ++;
+ }
+
+// System.out.println("Found " + i);
+
+ // launch yourkit profiler and look at retained sizes for both
+// Thread.sleep(1000000);
+ }
+
+ private static Object waiter = new Object();
+
+ private int runConCur(int numberOfThreads) throws Exception {
+ final ConcurrentMap<Integer, FeatureStructureImpl> m =
+ new ConcurrentHashMap<Integer, FeatureStructureImpl>(200, 0.75F, numberOfThreads);
+ chm = m;
+ MultiThreadUtils.Run2isb run2isb= new MultiThreadUtils.Run2isb() {
+
+ public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
+// int founds = 0, puts = 0;
+ for (int i = 0; i < sizeOfTest*threadNumber; i++) {
+ final int key = hash(i, threadNumber
+ ) / 2;
+ FeatureStructureImpl fs = m.putIfAbsent(key, new TOP(key, JCasHashMap.RESERVE_TOP_TYPE_INSTANCE));
+ while (fs != null && ((TOP)fs).jcasType == JCasHashMap.RESERVE_TOP_TYPE_INSTANCE) {
+ // someone else reserved this
+
+ // wait for notify
+ synchronized (waiter) {
+ fs = m.get(key);
+ if (((TOP)fs).jcasType == JCasHashMap.RESERVE_TOP_TYPE_INSTANCE) {
+ try {
+ waiter.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+// FeatureStructureImpl fs = m.get(key);
+ if (null == fs) {
+// puts ++;
+ FeatureStructureImpl prev = m.put(key, new TOP(key, FAKE_TOP_TYPE_INSTANCE));
+ if (((TOP)prev).jcasType == JCasHashMap.RESERVE_TOP_TYPE_INSTANCE) {
+ synchronized (waiter) {
+ waiter.notifyAll();
+ }
+ }
+// puts --; // someone beat us
+// founds ++;
+ }
+
+ }
+// System.out.println("concur Puts = " + puts + ", founds = " + founds);
+ }
+ };
+ long start = System.currentTimeMillis();
+ MultiThreadUtils.tstMultiThread("JCasHashMapTestCompConcur", numberOfThreads, 10, run2isb);
+ System.out.format("JCasCompTest - concur, time = %,f seconds%n", (System.currentTimeMillis() - start) / 1000.f);
+ return m.size();
+ }
+
+ private int runCustom(int numberOfThreads) throws Exception {
+ final JCasHashMap m = new JCasHashMap(256, true); // true = do use cache
+ jhm = m;
+
+ MultiThreadUtils.Run2isb run2isb= new MultiThreadUtils.Run2isb() {
+
+ public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
+// int founds = 0, puts = 0;
+ for (int i = 0; i < sizeOfTest*threadNumber; i++) {
+ final int key = hash(i, threadNumber
+ ) / 2;
+// if (key == 456551)
+// System.out.println("debug");
+ FeatureStructureImpl fs = m.getReserve(key);
+
+ if (null == fs) {
+// puts++;
+ m.put(new TOP(key, FAKE_TOP_TYPE_INSTANCE));
+ } else {
+// founds ++;
+ }
+ }
+// System.out.println("custom Puts = " + puts + ", founds = " + founds);
+ }
+ };
+ long start = System.currentTimeMillis();
+ MultiThreadUtils.tstMultiThread("JCasHashMapTestComp0", numberOfThreads, 10, run2isb);
+ System.out.format("JCasCompTest - custom, time = %,f seconds%n", (System.currentTimeMillis() - start) / 1000.f);
+ m.showHistogram();
+ return m.getApproximateSize();
+ }
+
+ // not accurate, use yourkit retained size instead
+ private void stats(String m, int size) {
+ for (int i = 0; i < 2; i++) {
+ System.gc();
+ }
+ Runtime r = Runtime.getRuntime();
+ long free =r.freeMemory();
+ long total = r.totalMemory();
+ System.out.format("JCasHashMapComp %s used = %,d size = %,d%n",
+ m, total - free, size);
+ }
+
+ private int hash(int i, int threadNumber) {
+ return (int)(((
+ (i + (threadNumber << 4)) * rm + 11 +
+ (threadNumber << 1))
+ >>> 16) & (sizeOfTest*threadNumber - 1));
+ }
+}
Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java?rev=1591466&r1=1591465&r2=1591466&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java Wed Apr 30 20:07:43 2014
@@ -30,10 +30,6 @@ import org.apache.uima.jcas.JCas;
import org.apache.uima.jcas.cas.TOP;
import org.apache.uima.jcas.cas.TOP_Type;
-/**
- *
- *
- */
public class JCasHashMapTest extends TestCase {
static private class FakeTopType extends TOP_Type {
public FakeTopType() {
@@ -48,10 +44,12 @@ public class JCasHashMapTest extends Tes
static private int[] addrs = new int[SIZE];
static int prev = 0;
- static {
+ static {
+ // unique numbers
for (int i = 0; i < SIZE; i++) {
addrs[i] = prev = prev + r.nextInt(14) + 1;
}
+ // shuffled
for (int i = SIZE - 1; i >= 1; i--) {
int ir = r.nextInt(i+1);
int temp = addrs[i];
@@ -60,6 +58,22 @@ public class JCasHashMapTest extends Tes
}
}
+
+ public void testBasic() {
+ int p = MultiThreadUtils.PROCESSORS;
+ if (p < 1 || Integer.bitCount(p) != 1) {
+ System.out.println("JCasHashMap skipping basic, nbr of processors is " + p);
+ return;
+ }
+ JCasHashMap m = new JCasHashMap(32 * MultiThreadUtils.PROCESSORS, true);
+ assertTrue( m.getConcurrencyLevel() == MultiThreadUtils.PROCESSORS );
+ m = new JCasHashMap(31 * MultiThreadUtils.PROCESSORS, true);
+ assertTrue( m.getConcurrencyLevel() == MultiThreadUtils.PROCESSORS ); // default is 7, but rounded up to 8
+ m = new JCasHashMap(16 * MultiThreadUtils.PROCESSORS, true);
+ assertTrue( m.getConcurrencyLevel() == (MultiThreadUtils.PROCESSORS / 2) );
+
+ }
+
public void testWithPerf() {
for (int i = 0; i < 5; i++ ) {
@@ -200,7 +214,7 @@ public class JCasHashMapTest extends Tes
private void arun(int n) {
JCasHashMap m = new JCasHashMap(200, true); // true = do use cache
- assertTrue(m.size() == 0);
+ assertTrue(m.getApproximateSize() == 0);
long start = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
@@ -212,6 +226,9 @@ public class JCasHashMapTest extends Tes
m.put(fs);
// }
}
+
+ assertEquals(m.getApproximateSize(), n);
+
System.out.format("time for v1 %,d is %,d ms%n",
n, System.currentTimeMillis() - start);
m.showHistogram();
@@ -250,7 +267,7 @@ public class JCasHashMapTest extends Tes
int sub_capacity = 64;
int agg_capacity = cores * sub_capacity;
JCasHashMap m = new JCasHashMap(agg_capacity, true); // true = do use cache
- assertTrue(m.size() == 0);
+ assertTrue(m.getApproximateSize() == 0);
int switchpoint = (int)Math.floor(agg_capacity * loadfactor);
fill(switchpoint, m);