You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2014/04/30 22:07:44 UTC

svn commit: r1591466 - in /uima/uimaj/trunk/uimaj-core/src: main/java/org/apache/uima/jcas/impl/JCasHashMap.java test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java

Author: schor
Date: Wed Apr 30 20:07:43 2014
New Revision: 1591466

URL: http://svn.apache.org/r1591466
Log:
[UIMA-3774] [UIMA-3784] fix size calc, rework design for less sync contention, better lock design, add test comparing to plain concurrenthashmap pseudo design (pseudo in that it doesn't really quite work)

Added:
    uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java
Modified:
    uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
    uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java

Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java?rev=1591466&r1=1591465&r2=1591466&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java Wed Apr 30 20:07:43 2014
@@ -20,10 +20,12 @@
 package org.apache.uima.jcas.impl;
 
 import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.uima.cas.impl.FeatureStructureImpl;
 import org.apache.uima.jcas.cas.TOP;
+import org.apache.uima.jcas.cas.TOP_Type;
 
 /**
  * Version 2 (2014) of map between CAS addr and JCasCover Objects
@@ -34,7 +36,9 @@ import org.apache.uima.jcas.cas.TOP;
  * 
  * Table always a power of 2 in size - permits faster hashing
  * 
- * Accesses to this table are not threadsafe.
+ * Accesses to this table are threadsafe, in order to support
+ * read-only CASes being shared by multiple threads. 
+ * Multiple iterators in different threads could be accessing the map and updating it.
  * 
  * Load factor tuning. 2,000,000 random inserts, 50 reps (for JIT)
  *   .5 (2x to 4x entries)  99%  5 probes   250 ms
@@ -45,43 +49,59 @@ import org.apache.uima.jcas.cas.TOP;
  *    version 1 at load factor .5 ran about 570 ms * 1.x 
  *      (did 2 lookups for fetches if not found,) 
  *   
- * Other changes: 
- *   remember finding empty slot on "miss" in get, 
- *     reuse on next put 
- *   change put to assume adding new item not already in table
- * 
- * Multi-threading:  For read-only CASes, multiple iterators in 
- * different threads could be accessing the map and updating it.
+ * No "get" method, only getReserve.  This method, if it doesn't
+ * find the key, eventually finds an empty (null) slot - it then
+ * stores a special "reserve" item with the same key value in that slot.
+ *    Other threads doing getReserve calls, upon encountering this 
+ *    reserve item, wait until the reserve is converted to a
+ *    real value (a notifyAll happens when this is done), and
+ *    then the getReserve returns the real item.
+ *    
+ * getReserve calls - when they find the item operate without any locking
+ *    
+ * Locking:
+ *   There is one lock used for reading and updating the table
+ *     -- not used for reading when item found, only if item not found or is reserved  
  * 
  * Strategy: have 1 outer implementation delegating to multiple inner ones
  *   number = concurrency level (a power of 2)
  *   
  *   The hash uses some # of low order bits to address the right inner one.
  *   
- *  This table is used to hold JCas cover classes for CAS feature structures.  There is one instance associated with each CAS that is using it.
+ *  This table is used to hold JCas cover classes for CAS feature structures.  
+ *  There is one instance of this table associated with each CAS that is using it.
  * <p> 
  * The update occurs in the code in JCasGenerated classes, which do:
  *        a call to get the value of the map for a key
  *        if that is "null", it creates the new JCas cover object, and does a "put" to add the value.
  * <p> 
- * The creation of the new JCas cover object can, in turn, run arbitrary user code, which can result in updates to the JCasHashMap which occur before this original update occurs.
+ * The creation of the new JCas cover object can, in turn, run arbitrary user code, 
+ * which can result in updates to the JCasHashMap which occur before this original update occurs.
  * <p>
- * In a multi-threaded environment, multiple threads can do a "get" for the same Feature Structure instance.  If it's not in the Map, the correct behavior is:
+ * In a multi-threaded environment, multiple threads can do a "get" 
+ * for the same Feature Structure instance.  If it's not in the Map, the correct behavior is:
  * <p>
- * one of the threads needs to add it
- * the other threads need to wait for the one thread to finish adding, and then return the object that the one thread added.
+ * one of the threads adds the new element
+ * the other threads wait for the one thread to finish adding, and then return the object that the one thread added.
  * <p>
  * The implementation works as follows:
  * <p>
- * 1) The JCasHashMap is split into "n" sub-maps.   The number is the number of cores, but grows more slowly as the # of cores > 16. :
- *    getReserve and put and clear are synchronized on the sub-maps, using ordinary synchronized keywords; the outer get/put are not synchronized
- * 1a) The outer size (aggregated from the sub sizes) is kept as an atomic integer, updated only on puts and clear
- * 2) The number of sub maps is rounded to a power of 2, to allow the low order bits of the hash of the key to be used to pick the map (via masking).
+ * 1) The JCasHashMap is split into "n" sub-maps.   
+ *    The number is the number of cores, but grows more slowly as the # of cores > 16. 
+ *    This number can be specified, but this is not currently exposed in the tuning parameters
+ *    Locking occurs on the sub-maps; the outer method calls are not synchronized
+ * 2) The number of sub maps is rounded to a power of 2, to allow the low order bits of the hash of the key 
+ *     to be used to pick the map (via masking).
  * 3) A getReserve that results in not-found returns a null, but adds to the table a special reserved element.
- * 4) A get that finds a special reserved element knows that some other thread is in the process of adding an entry for that key, so it waits.
- * 5) A put, if it finds a reserved-for-that-key element, replaces that with the real element, and then does a notifyAll to wake up any threads that were waiting (on this sub-map), and these threads then re-do the get
+ * 3a) This adding may result in table resizing
+ * 4) A getReserve that finds a special reserved element, knows that some other thread 
+ *    is in the process of adding an entry for that key, so it waits.
+ * 5) A put, if it finds a reserved-for-that-key element, replaces that with the real element, 
+ *    and then does a notifyAll to wake up any threads that were waiting (on this sub-map), 
+ *    and these threads then re-do the get.  Multiple threads could be waiting on this, and they will all 
+ *    wake-up.
  * <p>
- * All calls are of the getReserved, followed by a put if the getReserved returns null.  I removed the plain "get".  
+ * All calls are of the getReserved, followed by a put if the getReserved returns null.
  *   
  */
 public class JCasHashMap {
@@ -91,6 +111,10 @@ public class JCasHashMap {
   private static final boolean TUNE = false;
 
   private static final int DEFAULT_CONCURRENCY_LEVEL;
+      
+  private static final int PROBE_ADDR_INDEX = 0;
+  private static final int PROBE_DELTA_INDEX = 1;
+  
   static {
     int cores = Runtime.getRuntime().availableProcessors();
     DEFAULT_CONCURRENCY_LEVEL = (cores < 17) ? cores :
@@ -98,12 +122,35 @@ public class JCasHashMap {
                                                24 + (cores - 24) / 4;
   }
     
-  //These are for tuning measurements
-  private int histogram [];
-  private int maxProbe = 0;
-
+  private static class ReserveTopType extends TOP_Type {
+    public ReserveTopType() {
+      super();
+    }
+  }
+  
+  // public for test case use
+  public static final TOP_Type RESERVE_TOP_TYPE_INSTANCE = new ReserveTopType(); 
+  private static boolean isReserve(FeatureStructureImpl m) {
+    return m != null && ((TOP)m).jcasType == RESERVE_TOP_TYPE_INSTANCE;
+  }
+  private static boolean isReal(FeatureStructureImpl m) {
+    return m != null && ((TOP)m).jcasType != RESERVE_TOP_TYPE_INSTANCE;
+  }
+  
+  private static int[] resetProbeInfo(int[] probeInfo) {
+    probeInfo[PROBE_ADDR_INDEX] = -1;
+    probeInfo[PROBE_DELTA_INDEX] = 1;
+    return probeInfo;
+  }
+  
+  private static int[] setProbeInfo(int[] probeInfo, int probeAddr, int probeDelta) {
+    probeInfo[PROBE_ADDR_INDEX] = probeAddr;
+    probeInfo[PROBE_DELTA_INDEX] = probeDelta;
+    return probeInfo;    
+  }
+  
   private final float loadFactor = (float)0.60;
-
+    
   private final int initialCapacity; 
 
   private final boolean useCache;
@@ -116,257 +163,288 @@ public class JCasHashMap {
   
   private final SubMap[] subMaps;
   
-  private final AtomicInteger aggregate_size = new AtomicInteger(0);
-
   private final int subMapInitialCapacity;
 
   private class SubMap {
+
+    //These are for tuning measurements
+    private int histogram [];
+    private int maxProbe = 0;
+    private int maxProbeAfterContinue = 0;
+    private int continues = 0;;
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition lockCondition = lock.newCondition();
+    
     private int sizeWhichTriggersExpansion;
     private int size; // number of elements in the table  
-    private FeatureStructureImpl [] table;
+    private volatile FeatureStructureImpl [] table;
     private boolean secondTimeShrinkable = false;
-    private int bitsMask;  // 1's to "and" with result to keep in range   
-  
-    // for testing only:
-    int getbitsMask() {return bitsMask;}
     
     private SubMap newTable(int capacity) {
-      assert(Integer.bitCount(capacity) == 1);
-      table = new FeatureStructureImpl[capacity];
-      bitsMask = capacity - 1;
+      table = newTableKeepSize(capacity);
       size = 0;
-      sizeWhichTriggersExpansion = (int)(capacity * loadFactor);
+      if (TUNE) {
+        histogram = new int[200];
+        Arrays.fill(histogram, 0);
+      }
       return this;
     }
     
-    private synchronized void clear() {
-      // see if size is less than the 1/2 size that triggers expansion
-      if (size <  (sizeWhichTriggersExpansion >>> 1)) {
-        // if 2nd time then shrink by 50%
-        //   this is done to avoid thrashing around the threshold
-        if (secondTimeShrinkable) {
-          secondTimeShrinkable = false;
-          final int newCapacity = Math.max(subMapInitialCapacity, table.length >>> 1);
-          if (newCapacity < table.length) { 
-            newTable(newCapacity);  // shrink table by 50%
-          } else { // don't shrink below minimum
-            Arrays.fill(table,  null);
+    private FeatureStructureImpl[] newTableKeepSize(int capacity) {
+      assert(Integer.bitCount(capacity) == 1);
+      FeatureStructureImpl[] t = new FeatureStructureImpl[capacity];
+      sizeWhichTriggersExpansion = (int)(capacity * loadFactor);
+      return t;
+    }
+    
+    private void incrementSize() {
+      assert(lock.getHoldCount() > 0);
+      if (size >= sizeWhichTriggersExpansion) {
+        increaseTableCapacity();
+      }
+      size++;
+    }
+    
+    // Does size management - shrinking overly large tables after the 2nd time
+    private void clear() {
+      lock.lock();
+      try {
+        // see if size is less than the 1/2 size that triggers expansion
+        if (size <  (sizeWhichTriggersExpansion >>> 1)) {
+          // if 2nd time then shrink by 50%
+          //   this is done to avoid thrashing around the threshold
+          if (secondTimeShrinkable) {
+            secondTimeShrinkable = false;
+            final int newCapacity = Math.max(subMapInitialCapacity, table.length >>> 1);
+            if (newCapacity < table.length) { 
+              newTable(newCapacity);  // shrink table by 50%
+            } else { // don't shrink below minimum
+              Arrays.fill(table,  null);
+            }
+            size = 0;
+            return;
+          } else {
+            secondTimeShrinkable = true;
           }
-          size = 0;
-          return;
         } else {
-          secondTimeShrinkable = true;
+          secondTimeShrinkable = false; // reset this to require 2 triggers in a row
         }
-      } else {
-        secondTimeShrinkable = false; // reset this to require 2 triggers in a row
+        size = 0;
+        Arrays.fill(table, null);
+      } finally {
+        lock.unlock();
       }
-      size = 0;
-      Arrays.fill(table, null);
     }      
-    
-//    private synchronized FeatureStructureImpl get(int key, int hash) {
-//      int nbrProbes = 1;
-//      int probeAddr = hash & bitsMask;
-//      int probeDelta = 1;
-//      FeatureStructureImpl maybe = table[probeAddr];
-//      while ((null != maybe) && (maybe.getAddress() != key)) {
-//        if (TUNE) {
-//          nbrProbes++;
-//        }
-//        probeAddr = bitsMask & (probeAddr + (probeDelta++));
-//        maybe = table[probeAddr];
-//      }  
-//
-//      if (TUNE) {
-//        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-//        maxProbe = Math.max(maxProbe, nbrProbes);
-//      }
-//      return maybe;    
-//    }
-    
+     
     /**
-     * Gets a value, but if the value isn't there, it reserves the slot where it will go
-     * with a new instance where the key matches, but the type is null
-     * @param key - the addr in the heap
-     * @param hash - the hash that was already computed from the key
-     * @return - the found fs, or null
+     * Can be called under lock or not.
+     * @param key -
+     * @param hash -
+     * @param probeInfo - used to get/receive multiple int values;
+     *    0: (in/out) startProbe or -1, 
+     *    1: (in/out) probeDelta (starts at 1)
+     * @return the probeAddr in original table (which might have been resized)
      */
-    private synchronized FeatureStructureImpl getReserve(final int key, final int hash) {
+    private FeatureStructureImpl find(final int key, final int hash, final int[] probeInfo) {
       int nbrProbes = 1;
-      int probeAddr;
-      FeatureStructureImpl maybe;
-      retryAfterWait: do {
-        probeAddr = hash & bitsMask;
-        int probeDelta = 1;
-        maybe = table[probeAddr];
-        while (null != maybe) {
-          if (maybe.getAddress() == key) {
-            while (((TOP) maybe).jcasType == null) {
-              // we hit a reserve marker - there is another thread in the process of creating an instance of this,
-              // so wait for it to finish and then return it
-              final int sizeNow = size;
-              try {
-                wait();  // releases the synchronized monitor, otherwise this segment blocked for others while waiting
-              } catch (InterruptedException e) {
+      final boolean isContinue = TUNE && (probeInfo[PROBE_ADDR_INDEX] != -1); 
+      FeatureStructureImpl[] localTable = table;
+      final int bitMask = localTable.length - 1;
+      final int startProbe = probeInfo[PROBE_ADDR_INDEX];      
+      int probeAddr = (startProbe < 0) ? (hash & bitMask) : startProbe; 
+      assert((startProbe < 0) ? probeInfo[PROBE_DELTA_INDEX] == 1 : true);
+      int probeDelta = probeInfo[PROBE_DELTA_INDEX];
+      FeatureStructureImpl m = localTable[probeAddr];
+
+      while (null != m) {  // loop to traverse bucket chain
+        if (m.getAddress() == key) {
+          setProbeInfo(probeInfo, probeAddr, probeDelta);
+          if (TUNE) {
+            final boolean needUnlock;
+            if (!lock.isHeldByCurrentThread()) {
+              lock.lock();
+              needUnlock = true;
+            } else {
+              needUnlock = false;
+            }
+            try {
+              histogram[nbrProbes] += 1;
+              if (maxProbe < nbrProbes) {
+                maxProbe = nbrProbes;
               }
-              if (size != sizeNow) {
-                // at this point, the table may have grown
-                // so start over
-                continue retryAfterWait;
+              if (isContinue) {
+                if (maxProbeAfterContinue < nbrProbes) {
+                  maxProbeAfterContinue = nbrProbes;
+                }
+                continues ++;
+              }
+            } finally {
+              if (needUnlock) {
+                lock.unlock();
               }
             }
-            if (TUNE) {
-              histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-              maxProbe = Math.max(maxProbe, nbrProbes);
-            }
-            return maybe;
           }
-          // is not null, but is wrong key
-          if (TUNE) {
-            nbrProbes++;
-          }
-          probeAddr = bitsMask & (probeAddr + (probeDelta++));
-          maybe = table[probeAddr];
+          return m;
         }
-        break;
-      } while (true);  // must be true to have label of continue used
-      // "maybe" is null
-      // reserve this slot to prevent other "getReserved" calls for this same instance from succeeding,
-      // causing them to wait until this slot gets filled in with a FS value
-      table[probeAddr] = new TOP(key, null);  // null indicates its a RESERVE marker
-
-      if (TUNE) {
-        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-        maxProbe = Math.max(maxProbe, nbrProbes);
+        nbrProbes++;
+        probeAddr = bitMask & (probeAddr + (probeDelta++));
+        m = localTable[probeAddr];
       }
-      return maybe;
+      setProbeInfo(probeInfo, probeAddr, probeDelta);
+      return m;
     }
-    
-    private synchronized void put(int key, FeatureStructureImpl value, int hash) {
-      if (size >= sizeWhichTriggersExpansion) {
-        increaseTableCapacity();
-      }
-      size++;
-      
-      int probeAddr = hash & bitsMask;
-      int probeDelta = 1;
-      int nbrProbes = 1;
-      FeatureStructureImpl m = table[probeAddr];
-      while (null != m) {
-        if (((TOP)m).jcasType == null) { 
-          // this slot was previously reserved - check to see if the key matches
-          // (must be same key, otherwise, impl could deadlock)
-          if (m.getAddress() == key) {
-            // found the previously reserved slot
-            table[probeAddr] = value;
-            aggregate_size.incrementAndGet();
-            notifyAll();
-            if (TUNE) {
-              histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-              maxProbe = Math.max(maxProbe, nbrProbes);
-            }
-            return;
-          }
-        }
         
-        // skip if adding the same element to the table
-        // probably never happens, though
-        if (m.getAddress() == key) {
-          if (TUNE) {
-            System.err.format("JCasHashMap found already existing cover instance for key %,d, ignoring put%n", key);
-            throw new RuntimeException(); //to get stack trace
+    /**
+     * Gets a value, but if the value isn't there, it reserves the slot where it will go
+     * with a new instance where the key matches, but the type is a unique value.
+     * 
+     * Threading: not synchronized for main path where get is finding an element.
+     *   Since elements are never updated, there is no race if an element is found.
+     *   And it doesn't matter if the table is resized (if the element is found).
+     *   If it is not found, or a reserve is found, need to get the lock, and
+     *     start over if resized, or
+     *     continue from reserved or null spot if not    
+     *
+     * @param key - the addr in the heap
+     * @param hash - the hash that was already computed from the key
+     * @return - the found fs, or null
+     */
+    private FeatureStructureImpl getReserve(final int key, final int hash) {
+
+      boolean isLocked = false;
+      int[] probeInfo = resetProbeInfo(new int[2]);
+      try {
+ 
+     retry:
+        while (true) { // loop back point after locking against updates, to re-traverse the bucket chain from the beginning
+          FeatureStructureImpl m;
+          final FeatureStructureImpl[] localTable = table;
+          
+          if (isReal(m = find(key, hash, probeInfo))) {
+            return m;  // fast path for found item       
+          }
+          
+          // is reserve or null. Redo or continue search under lock
+          // need to do this for reserve-case because otherwise, could 
+          //   wait, but notify could come before wait - hence, wait forever
+          lock.lock();
+          isLocked = true;
+          /*****************
+           *    LOCKED     *
+           *****************/
+          if (localTable != table) { // redo search from top, because table resized
+            resetProbeInfo(probeInfo);
           }
+          // note: localTable not used from this point, unless reset
+          
+          if (isReal(m = find(key, hash, probeInfo))) {
+            return m;  // fast path for found item       
+          }
+          
+          while (isReserve(m)) {
+            final FeatureStructureImpl[] localTable2 = table;  // to see if table gets resized
+            // can't wait on reserved item because would need to do lock.unlock() followed by wait, but
+            //   inbetween these, another thread could already do the notify.
+            try {
+              lockCondition.await();  // wait on object that needs to be unlocked
+            } catch (InterruptedException e) {
+            }
+
+            if (localTable2 != table) { // table was resized
+              resetProbeInfo(probeInfo);       
+              continue retry;
+            }
+            m = localTable2[probeInfo[PROBE_ADDR_INDEX]];
+            if (isReserve(m)) {
+              continue;  // case = interruptedexception && no resize && not changed to real, retry
+            }
+            return m; // return real item
+          }
+      
+          // is null. Reserve this slot to prevent other "getReserved" calls for this same instance from succeeding,
+          // causing them to wait until this slot gets filled in with a FS value
+          // Use table, not localTable, because resize may have occurred
+          table[probeInfo[PROBE_ADDR_INDEX]] = new TOP(key, RESERVE_TOP_TYPE_INSTANCE);
+          incrementSize();          
+          return null;
         }
-        if (TUNE) {
-          nbrProbes++;
+      } finally {
+        if (isLocked) {
+          lock.unlock();
         }
-        probeAddr = bitsMask & (probeAddr + (probeDelta++));
-        m = table[probeAddr];
       }
-      if (TUNE) {
-        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-        maxProbe = Math.max(maxProbe, nbrProbes);
+    }
+      
+        
+    private FeatureStructureImpl put(int key, FeatureStructureImpl value, int hash) {
+
+      lock.lock();
+      try {
+        final int[] probeInfo = resetProbeInfo(new int[2]);
+        FeatureStructureImpl m = find(key, hash, probeInfo);
+        table[probeInfo[PROBE_ADDR_INDEX]] = value;
+        if (isReserve(m)) {
+          lockCondition.signalAll();
+          // dont update size - was updated when reserve was added
+          return null;
+        } else if (m == null) {
+            incrementSize();  // otherwise, replacing an existing item, don't update size
+        }
+        return m;
+      } finally {
+        lock.unlock();
       }
-      table[probeAddr] = value;
-      aggregate_size.incrementAndGet();
     }
-    
-//    private synchronized FeatureStructureImpl putIfAbsent(
-//        int key, 
-//        Callable<FeatureStructureImpl> valueProducer, 
-//        int hash) throws Exception {
-//      int nbrProbes = 1;
-//      int probeAddr = hash & bitsMask;
-//      int probeDelta = 1;
-//      FeatureStructureImpl maybe = table[probeAddr];
-//      while ((null != maybe) && (maybe.getAddress() != key)) {
-//        if (TUNE) {
-//          nbrProbes++;
-//        }
-//        probeAddr = bitsMask & (probeAddr + (probeDelta++));
-//        maybe = table[probeAddr];
-//      }
-//      
-//      if (TUNE) {
-//        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-//        maxProbe = Math.max(maxProbe, nbrProbes);
-//      }
-//      
-//      if (null == maybe) {
-//        table[probeAddr] = maybe = valueProducer.call();
-//        aggregate_size.incrementAndGet();
-//      }
-//      return maybe;    
-//    }
-    
-//    private int findEmptySlot(int key, int hash) {
-//      int probeAddr = hash & bitsMask;
-//      int probeDelta = 1;
-//      int nbrProbes = 1;
-//      FeatureStructureImpl m = table[probeAddr];
-//      while (null != m) {
-//        if (((TOP)m).jcasType == null) { 
-//          // this slot was previously reserved - check to see if the key matches
-//          // (must be same key, otherwise, impl could deadlock)
-//          if (m.getAddress() == key) {
-//            
-//          }
-//        if (TUNE) {
-//          nbrProbes++;
-//        }
-//        probeAddr = bitsMask & (probeAddr + (probeDelta++));
-//      }
-//      if (TUNE) {
-//        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-//        maxProbe = Math.max(maxProbe, nbrProbes);
-//      }
-//      return probeAddr;
-//    }
+     
+       
+   /**
+    * Only used to fill in newly expanded table
+    * @param key -
+    * @param value -
+    * @param hash -
+    */
+    
+    private void putInner(int key, FeatureStructureImpl value, int hash) {
+      assert(lock.getHoldCount() > 0);
+      final int[] probeInfo = resetProbeInfo(new int[2]);
+      
+      FeatureStructureImpl m = find(key, hash, probeInfo);
+      assert(m == null);  // no dups in original table imply no hits in new one
+      table[probeInfo[PROBE_ADDR_INDEX]] = value;
+    }
+      
 
+    // called under lock
     private void increaseTableCapacity() {
       final FeatureStructureImpl [] oldTable = table; 
       final int oldCapacity = oldTable.length;
-    
       int newCapacity = 2 * oldCapacity;
       
       if (TUNE) {
-        System.out.println("Size increasing from " + oldCapacity + " to " + newCapacity);
+        System.out.println("Capacity increasing from " + oldCapacity + " to " + newCapacity);
       }
-      newTable(newCapacity);
-      size = 0;
+      table = newTableKeepSize(newCapacity);
       for (int i = 0; i < oldCapacity; i++) {
         FeatureStructureImpl fs = oldTable[i];
         if (fs != null) {
-          int key = fs.getAddress();
-          int hash = hashInt(key);
-          put(key, fs, hash >>> concurrencyLevelBits);
+          final int key = fs.getAddress();
+          final int hash = hashInt(key);
+          putInner(key, fs, hash >>> concurrencyLevelBits);
         }   
       }
     }
   }
   
   JCasHashMap(int capacity, boolean doUseCache) {
-    this(capacity, doUseCache, DEFAULT_CONCURRENCY_LEVEL);
+    // reduce concurrency so that capacity / concurrency >= 32
+    //   that is, minimum sub-table capacity is 32 entries
+    // if capacity/concurrency < 32,
+    //   concurrency = capacity / 32
+    this(capacity, doUseCache,
+        ((capacity / DEFAULT_CONCURRENCY_LEVEL) < 32) ?
+            Math.max(1, capacity / 32) :
+            DEFAULT_CONCURRENCY_LEVEL);
   }
   
   JCasHashMap(int capacity, boolean doUseCache, int aConcurrencyLevel) {
@@ -390,10 +468,6 @@ public class JCasHashMap {
     for (int i = 0; i < concurrencyLevel; i++) {
       subMaps[i] = (new SubMap()).newTable(subMapInitialCapacity);
     }
-    if (TUNE) {
-      histogram = new int[200];
-      Arrays.fill(histogram, 0);
-    }
   }
       
   // cleared when cas reset
@@ -407,61 +481,29 @@ public class JCasHashMap {
     for (SubMap m : subMaps) {
       m.clear();
     }
-    aggregate_size.set(0);
   }
-
-//  public FeatureStructureImpl get(int key) {
-//    if (!this.useCache) {
-//      return null;
-//    }
-//    int hash = hashInt(key);
-//    int subMapIndex = hash & concurrencyBitmask;
-//    
-//    SubMap m = subMaps[subMapIndex];
-//    return m.get(key, hash >>> concurrencyLevelBits);    
-//  }
+  
+  private SubMap getSubMap(int hash) {
+    return subMaps[hash & concurrencyBitmask];
+  }
   
   public FeatureStructureImpl getReserve(int key) {
     if (!this.useCache) {
       return null;
     }
-    int hash = hashInt(key);
-    int subMapIndex = hash & concurrencyBitmask;
-    
-    SubMap m = subMaps[subMapIndex];
-    return m.getReserve(key, hash >>> concurrencyLevelBits);    
+    final int hash = hashInt(key);
+    return getSubMap(hash).getReserve(key, hash >>> concurrencyLevelBits);
   }
 
-  
-//  public FeatureStructureImpl putIfAbsent(int key, Callable<FeatureStructureImpl> valueProducer) throws Exception {
-//    if (!this.useCache) {
-//      return valueProducer.call();
-//    }
-//    int hash = hashInt(key);
-//    int subMapIndex = hash & concurrencyBitmask;
-//    
-//    SubMap m = subMaps[subMapIndex];
-//    return m.putIfAbsent(key, valueProducer, hash >>> concurrencyLevelBits);   
-//  }
-//  
-
-  
-  public void put(FeatureStructureImpl value) {
+  public FeatureStructureImpl put(FeatureStructureImpl value) {
     if (!this.useCache) {
-      return;
+      return null;
     }
-    int key = value.getAddress();
-    int hash = hashInt(key);
-    int subMapIndex = hash & concurrencyBitmask;
-    
-    SubMap m = subMaps[subMapIndex];
-    m.put(key, value, hash >>> concurrencyLevelBits);
+    final int key = value.getAddress();
+    final int hash = hashInt(key);
+    return getSubMap(hash).put(key,  value,  hash >>> concurrencyLevelBits);
   }
-  
-  public int size() {
-    return aggregate_size.get();
-  }
-
+    
   // The hash function is derived from murmurhash3 32 bit, which
   // carries this statement:
   
@@ -491,31 +533,57 @@ public class JCasHashMap {
     return h1;
   }
      
+  //test case use
   int[] getCapacities() {
     int[] r = new int[subMaps.length];
     int i = 0;
     for (SubMap subMap : subMaps) {
-      r[i++] = subMap.bitsMask + 1;
+      r[i++] = subMap.table.length;
     }
     return r;
   }
   
+  //test case use
+  int getApproximateSize() {
+    int s = 0;
+    for (SubMap subMap : subMaps) {
+      synchronized (subMap) {
+        s += subMap.size;
+      }
+    }
+    return s;
+  }
+  
   public void showHistogram() {
     if (TUNE) {
-      System.out.println("Histogram of number of probes, factor = " + loadFactor + ", max = "
-              + maxProbe);
-      for (int i = 0; i <= maxProbe; i++) {
-        System.out.println(i + ": " + histogram[i]);
-      }     
+      int sm = -1;
       int agg_tableLength = 0;
       for (SubMap m : subMaps) {
+        sm++;
+        int sumI = 0;
+        
+        for (int i : m.histogram) {
+          sumI += i;
+        }
+        
+        System.out.format(
+            "Histogram %d of number of probes, loadfactor = %.1f, maxProbe=%,d afterContinue=%,d nbr regs=%,d nbrContinues=%,d%n",
+            sm, loadFactor, m.maxProbe, m.maxProbeAfterContinue, sumI, m.continues);
+        for (int i = 0; i <= m.maxProbe; i++) {
+          System.out.println(i + ": " + m.histogram[i]);
+        }     
         agg_tableLength += m.table.length;
       }
-      System.out.println("bytes / entry = " + (float) (agg_tableLength) * 4 / size());
+      
+      System.out.println("bytes / entry = " + (float) (agg_tableLength) * 4 / getApproximateSize());
       System.out.format("size = %,d, prevExpansionTriggerSize = %,d, next = %,d%n",
-          size(),
+          getApproximateSize(),
           (int) ((agg_tableLength >>> 1) * loadFactor),
           (int) (agg_tableLength * loadFactor));
     }
   }
+  
+  public int getConcurrencyLevel() {
+    return concurrencyLevel;
+  }
 }

Added: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java?rev=1591466&view=auto
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java (added)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapCompareTest.java Wed Apr 30 20:07:43 2014
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.jcas.impl;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.uima.cas.impl.FeatureStructureImpl;
+import org.apache.uima.internal.util.MultiThreadUtils;
+import org.apache.uima.jcas.cas.TOP;
+import org.apache.uima.jcas.cas.TOP_Type;
+
+import junit.framework.TestCase;
+
+/**
+ * Run this as a single test with yourkit, and look at the retained storage for both maps.
+ * 
+ *   Java 8 test: Concurrent Hash Map impl showed ~2.25 MB * 8 (concurrency level)
+ *                JCasHashmap              showed ~0.835 MB * 8 
+ *
+ */
+public class JCasHashMapCompareTest extends TestCase {
+  
+  private static class FakeTopType extends TOP_Type {
+    public FakeTopType() {
+      super();
+    }
+  }
+  
+  private static final long rm =  0x5deece66dL;
+
+  private static int sizeOfTest = 1024 * 8;  
+//  private static final int SIZEm1 = SIZE - 1;
+  private static final TOP_Type FAKE_TOP_TYPE_INSTANCE = new FakeTopType(); 
+  private JCasHashMap jhm;
+  private ConcurrentMap<Integer, FeatureStructureImpl> chm;
+
+  
+  public void testComp() throws Exception {
+    Thread.sleep(0000);
+    int numberOfThreads =  MultiThreadUtils.PROCESSORS;    
+    System.out.format("test JCasHashMapComp with %d threads%n", numberOfThreads);
+    runCustom(numberOfThreads);
+    runConCur(numberOfThreads);
+    runCustom(numberOfThreads);
+    runConCur(numberOfThreads);
+    runCustom(numberOfThreads);
+    runConCur(numberOfThreads);
+//    stats("custom", runCustom(numberOfThreads));  // not accurate, use yourkit retained size instead
+//    stats("concur", runConCur(numberOfThreads));
+    Set<Integer> ints = new HashSet<Integer>();
+    int i = 0;
+    for (Entry<Integer, FeatureStructureImpl> e : chm.entrySet()) {
+      assertFalse(ints.contains(Integer.valueOf(e.getKey())));
+      assertEquals(e.getValue().getAddress(), (int)(e.getKey()));
+      ints.add(e.getKey());
+      i ++;
+    }
+    
+//    System.out.println("Found " + i);
+    
+    // launch yourkit profiler and look at retained sizes for both
+//    Thread.sleep(1000000);
+  }
+
+  private static Object waiter = new Object();
+  
+  private int runConCur(int numberOfThreads) throws Exception {
+    final ConcurrentMap<Integer, FeatureStructureImpl> m = 
+        new ConcurrentHashMap<Integer, FeatureStructureImpl>(200, 0.75F, numberOfThreads);
+    chm = m;
+    MultiThreadUtils.Run2isb run2isb= new MultiThreadUtils.Run2isb() {
+      
+      public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
+//        int founds = 0, puts = 0;
+        for (int i = 0; i < sizeOfTest*threadNumber; i++) {
+          final int key = hash(i, threadNumber
+              ) / 2;
+          FeatureStructureImpl fs = m.putIfAbsent(key, new TOP(key, JCasHashMap.RESERVE_TOP_TYPE_INSTANCE));
+          while (fs != null && ((TOP)fs).jcasType == JCasHashMap.RESERVE_TOP_TYPE_INSTANCE) {
+            // someone else reserved this
+
+            // wait for notify
+            synchronized (waiter) {
+              fs = m.get(key);
+              if (((TOP)fs).jcasType == JCasHashMap.RESERVE_TOP_TYPE_INSTANCE) {
+                try {
+                  waiter.wait();
+                } catch (InterruptedException e) {
+                }
+              }
+            }
+          }
+            
+//          FeatureStructureImpl fs = m.get(key);
+          if (null == fs) {
+//            puts ++;
+            FeatureStructureImpl prev = m.put(key,  new TOP(key, FAKE_TOP_TYPE_INSTANCE));
+            if (((TOP)prev).jcasType == JCasHashMap.RESERVE_TOP_TYPE_INSTANCE) {
+              synchronized (waiter) {
+                waiter.notifyAll();
+              }
+            }
+//              puts --;  // someone beat us 
+//              founds ++;
+          }
+          
+        }
+//        System.out.println("concur Puts = " + puts + ", founds = " + founds);
+      }
+    };  
+    long start = System.currentTimeMillis();
+    MultiThreadUtils.tstMultiThread("JCasHashMapTestCompConcur",  numberOfThreads, 10, run2isb);
+    System.out.format("JCasCompTest - concur, time = %,f seconds%n", (System.currentTimeMillis() - start) / 1000.f);
+    return m.size();
+  }
+  
+  private int runCustom(int numberOfThreads) throws Exception {
+    final JCasHashMap m = new JCasHashMap(256, true); // true = do use cache
+    jhm = m;
+
+    MultiThreadUtils.Run2isb run2isb= new MultiThreadUtils.Run2isb() {
+      
+      public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
+//        int founds = 0, puts = 0;
+        for (int i = 0; i < sizeOfTest*threadNumber; i++) {
+          final int key = hash(i, threadNumber
+              ) / 2;
+//          if (key == 456551)
+//            System.out.println("debug");
+          FeatureStructureImpl fs = m.getReserve(key);
+
+          if (null == fs) {
+//            puts++;
+            m.put(new TOP(key, FAKE_TOP_TYPE_INSTANCE));
+          } else {
+//            founds ++;
+          }
+        }
+//        System.out.println("custom Puts = " + puts + ", founds = " + founds);
+      }
+    };  
+    long start = System.currentTimeMillis();
+    MultiThreadUtils.tstMultiThread("JCasHashMapTestComp0",  numberOfThreads,  10, run2isb);
+    System.out.format("JCasCompTest - custom, time = %,f seconds%n", (System.currentTimeMillis() - start) / 1000.f);
+    m.showHistogram();
+    return m.getApproximateSize();
+  }
+  
+  // not accurate, use yourkit retained size instead
+  private void stats(String m, int size) {
+    for (int i = 0; i < 2; i++) {
+      System.gc();
+    }
+    Runtime r = Runtime.getRuntime();
+    long free =r.freeMemory();
+    long total = r.totalMemory();
+    System.out.format("JCasHashMapComp %s used = %,d  size = %,d%n",
+        m, total - free, size);
+  }
+
+  private int hash(int i, int threadNumber) {    
+    return (int)(((
+                  (i + (threadNumber << 4)) * rm + 11 + 
+                  (threadNumber << 1))
+                 >>> 16) & (sizeOfTest*threadNumber - 1));
+  }
+}

Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java?rev=1591466&r1=1591465&r2=1591466&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java Wed Apr 30 20:07:43 2014
@@ -30,10 +30,6 @@ import org.apache.uima.jcas.JCas;
 import org.apache.uima.jcas.cas.TOP;
 import org.apache.uima.jcas.cas.TOP_Type;
 
-/**
- * 
- *
- */
 public class JCasHashMapTest extends TestCase {
   static private class FakeTopType extends TOP_Type {
     public FakeTopType() {
@@ -48,10 +44,12 @@ public class JCasHashMapTest extends Tes
   static private int[] addrs = new int[SIZE];
   static int prev = 0;
   
-  static {   
+  static {  
+    // unique numbers
     for (int i = 0; i < SIZE; i++) { 
       addrs[i] = prev = prev + r.nextInt(14) + 1;
     }
+    // shuffled
     for (int i = SIZE - 1; i >= 1; i--) {
       int ir = r.nextInt(i+1);
       int temp = addrs[i];
@@ -60,6 +58,22 @@ public class JCasHashMapTest extends Tes
     }
   }
 
+  
+  public void testBasic() {
+    int p = MultiThreadUtils.PROCESSORS;
+    if (p < 1 || Integer.bitCount(p) != 1) {
+      System.out.println("JCasHashMap  skipping basic, nbr of processors is " + p);
+      return;
+    }
+    JCasHashMap m = new JCasHashMap(32 * MultiThreadUtils.PROCESSORS, true);
+    assertTrue( m.getConcurrencyLevel() == MultiThreadUtils.PROCESSORS );
+    m = new JCasHashMap(31 * MultiThreadUtils.PROCESSORS,  true);
+    assertTrue( m.getConcurrencyLevel() == MultiThreadUtils.PROCESSORS ); // default is 7, but rounded up to 8
+    m = new JCasHashMap(16 * MultiThreadUtils.PROCESSORS,  true);
+    assertTrue( m.getConcurrencyLevel() == (MultiThreadUtils.PROCESSORS / 2) ); 
+    
+  }
+  
   public void testWithPerf()  {
     
     for (int i = 0; i <  5; i++ ) {
@@ -200,7 +214,7 @@ public class JCasHashMapTest extends Tes
    
   private void arun(int n) {
     JCasHashMap m = new JCasHashMap(200, true); // true = do use cache 
-    assertTrue(m.size() == 0);
+    assertTrue(m.getApproximateSize() == 0);
        
     long start = System.currentTimeMillis();
     for (int i = 0; i < n; i++) {
@@ -212,6 +226,9 @@ public class JCasHashMapTest extends Tes
         m.put(fs);
 //      }
     }
+    
+    assertEquals(m.getApproximateSize(), n);
+    
     System.out.format("time for v1 %,d is %,d ms%n",
         n, System.currentTimeMillis() - start);
     m.showHistogram();
@@ -250,7 +267,7 @@ public class JCasHashMapTest extends Tes
     int sub_capacity = 64;
     int agg_capacity = cores * sub_capacity;
     JCasHashMap m = new JCasHashMap(agg_capacity, true); // true = do use cache 
-    assertTrue(m.size() == 0);
+    assertTrue(m.getApproximateSize() == 0);
      
     int switchpoint = (int)Math.floor(agg_capacity * loadfactor);
     fill(switchpoint, m);