You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2015/06/22 22:22:18 UTC

svn commit: r1686928 - /uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java

Author: schor
Date: Mon Jun 22 20:22:18 2015
New Revision: 1686928

URL: http://svn.apache.org/r1686928
Log:
[UIMA-4470] tweak the default concurrency level for JCasHashMap as a function of the reported number of cores.  

Modified:
    uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java

Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java?rev=1686928&r1=1686927&r2=1686928&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java Mon Jun 22 20:22:18 2015
@@ -99,6 +99,10 @@ import org.apache.uima.cas.impl.FeatureS
  *    wake-up.
  * <p>
  * All calls are of the getReserved, followed by a put if the getReserved returns null.
+ *
+ * (Experiment - disabled after no change noted
+ * To improve locality of reference, an aux data structure of size to fit in one cache line of a Power7 (128 bytes)
+ * caches the latest lookups)
  *   
  */
 public class JCasHashMap {
@@ -106,6 +110,8 @@ public class JCasHashMap {
   // set to true to collect statistics for tuning
   // you have to also put a call to jcas.showJfsFromCaddrHistogram() at the end of the run
   static final boolean TUNE = false;
+  static final boolean check = true;  // message if concurrency level reduced because initial size was small
+//  private static final boolean MEASURE_CACHE = false /* Misc.getNoValueSystemProperty("uima.measure.jcas.hashmap.cache")*/;
 
   static int nextHigherPowerOf2(int i) {
     return (i < 1) ? 1 : Integer.highestOneBit(i) << ( (Integer.bitCount(i) == 1 ? 0 : 1));
@@ -116,34 +122,42 @@ public class JCasHashMap {
    * package private for testing
    * 
    * not final to allow test case to reset it
+   * must not be changed during multi-thread operation
+   * 
    */
   static int DEFAULT_CONCURRENCY_LEVEL;
+  static final int cores;
   
   static {
-    int cores = Runtime.getRuntime().availableProcessors();
-    // cores:lvl   0:1  1:1  2:2  3-15:4  16-31:8  32-63:16  else 32  
-    DEFAULT_CONCURRENCY_LEVEL = 
-        (cores <= 1)  ? 1 :
-        (cores <= 4)  ? 2 :  // assumption: cores used for other work, too
-        (cores <= 32) ? 4 :
-        (cores <= 128) ? 8 :
-                        16;   // emprical guesswork, not scientifically set    
+    cores = Runtime.getRuntime().availableProcessors();
+    // high concurrency can increase L1 cache dumping
+    DEFAULT_CONCURRENCY_LEVEL =  // approx between 10-20% of the number of cores dropping to 5% at high core values (to control cache loading)
+        // min is 1
+        // max is 16 (the number of l1 slots is 256 in some high performance cpus (2015) so going higer than this 
+        //  probably has too much cache loading
+        // 
+        1 + (int)(cores * 
+                   ((cores > 64) ? .08 : 
+                    (cores > 32) ? .1  :
+                    (cores > 16) ? .2  :
+                    (cores > 8)  ? .3  : .4));
   }
 
   static int getDEFAULT_CONCURRENCY_LEVEL() {
     return DEFAULT_CONCURRENCY_LEVEL;
   }
   
+  // used in test cases
   static void setDEFAULT_CONCURRENCY_LEVEL(int dEFAULT_CONCURRENCY_LEVEL) {
     DEFAULT_CONCURRENCY_LEVEL = nextHigherPowerOf2(dEFAULT_CONCURRENCY_LEVEL);
-//    System.out.println("JCasHashMap setting concurrency level to " + DEFAULT_CONCURRENCY_LEVEL);
   }
   
-
-    
+//  // size set to ~1 cache line 
+//  private final static int CACHE_SIZE = 32;  // running testcases: 16 -> 558,688 hits, 32 ->  850,300, 24 ->679,175
+//  private final static AtomicLong cacheHits = new AtomicLong(0);
+//  private final static AtomicLong cacheMisses = new AtomicLong(0);
     
   
-  
   private final float loadFactor = (float)0.60;
     
   private final int initialCapacity; 
@@ -163,6 +177,11 @@ public class JCasHashMap {
   // optimization for concurrency level 1
   private final JCasHashMapSubMap oneSubmap;
   
+//  // cache to improve locality of reference for lookup
+//  private final FeatureStructureImpl[] cacheFS = new FeatureStructureImpl[CACHE_SIZE];  // one cache line is 32 words, save some for length and java object overhead
+//  private final int[] cacheInt = new int[CACHE_SIZE];
+//  private int cacheNewIndex = 0;
+  
   JCasHashMap(int capacity, boolean doUseCache) {
     // reduce concurrency so that capacity / concurrency >= 32
     //   that is, minimum sub-table capacity is 32 entries
@@ -173,6 +192,11 @@ public class JCasHashMap {
             nextHigherPowerOf2(
                 Math.max(1, capacity / 32)) :
             DEFAULT_CONCURRENCY_LEVEL);
+    if (check && (capacity / DEFAULT_CONCURRENCY_LEVEL) < 32) {
+      System.out.println(String.format("JCasHashMap concurrency reduced, capacity: %,d DefaultConcur: %d, concur: %d%n",
+          capacity, DEFAULT_CONCURRENCY_LEVEL, 
+          nextHigherPowerOf2(Math.max(1, capacity / 32))));
+    }
   }
   
   JCasHashMap(int capacity, boolean doUseCache, int aConcurrencyLevel) {
@@ -204,6 +228,42 @@ public class JCasHashMap {
     }
     oneSubmap = concurrencyLevel == 1 ? subMaps[0] : null;
   }
+  
+  /**
+   * initial capacity (other than testing), is by default (from JCasImpl) is bigger of
+   *   256 and cas heap initial size (500,000) / 16 = 31K
+   *   but users may set it lower in their uima configuration
+   *   
+   * We use the current capacity of the JCasHashMap to set the concurrency limit 
+   * 
+   * @param casCapacity the capacity
+   * @return true if the concurrency is limited, and could increase with reallocation
+   */
+  // method used in JCasImpl, when clearing
+  static boolean concurrencyLimitedByInitialCapacity(int currentConcurrencyLevel, int curMapSize) {
+    if (DEFAULT_CONCURRENCY_LEVEL <= currentConcurrencyLevel) {
+      return false;
+    }
+    
+    int submapSize = curMapSize / DEFAULT_CONCURRENCY_LEVEL;
+    
+    int newConcurrencyLevel = (submapSize < 32) ?
+        nextHigherPowerOf2(
+            Math.max(1, curMapSize / 32)) :
+              DEFAULT_CONCURRENCY_LEVEL;
+        
+    return newConcurrencyLevel > currentConcurrencyLevel;     
+  }
+  
+  static int sizeAdjustedConcurrency(int curMapSize) {
+    int submapSize = curMapSize / DEFAULT_CONCURRENCY_LEVEL;
+    
+    int newConcurrencyLevel = (submapSize < 32) ?
+        nextHigherPowerOf2(
+            Math.max(1, curMapSize / 32)) :
+              DEFAULT_CONCURRENCY_LEVEL;
+    return Math.max(32 * newConcurrencyLevel,   curMapSize / 2);  
+  }
       
   // cleared when cas reset
   // storage management:
@@ -216,8 +276,10 @@ public class JCasHashMap {
     for (JCasHashMapSubMap m : subMaps) {
       m.clear();
     }
+//    Arrays.fill(cacheFS,  null);
+//    Arrays.fill(cacheInt, 0);
   }
-  
+    
   private JCasHashMapSubMap getSubMap(int hash) {
     return (null != oneSubmap) ? oneSubmap : subMaps[hash & concurrencyBitmask];
   }
@@ -226,17 +288,60 @@ public class JCasHashMap {
     if (!this.useCache) {
       return null;
     }
+//    for (int i = 0; i < cacheInt.length; i++) {
+//      final int vi = cacheInt[i];
+//      if (vi == 0) {
+//        break;
+//      }
+//        
+//      if (vi == key) {
+//        if (MEASURE_CACHE) {
+//          cacheHits.incrementAndGet();
+//        }
+//        FeatureStructureImpl fsi = cacheFS[i];
+//        if (fsi.getAddress() != key) { // recheck to avoid sync 
+//          break; // entry was overwritten
+//        }
+//        return fsi;
+////          
+////          if (i != 0) {
+////            // manage lru  - measurement showed no significant boost
+////            System.arraycopy(cache, 0, cache, 2, i);
+////            cache[0] = keyI;
+////            cache[1] = r;
+////          }
+////          return r;
+////        }
+//      }
+//    }
+        
+//    if (MEASURE_CACHE) {
+//      cacheMisses.incrementAndGet();  // includes creates
+//    }
     final int hash = hashInt(key);
-    return getSubMap(hash).getReserve(key, hash >>> concurrencyLevelBits);
+    final FeatureStructureImpl r = getSubMap(hash).getReserve(key, hash >>> concurrencyLevelBits);
+    
+//    if (r != null) {
+//      updateCache(key, r);
+//    }
+    return r;
   }
 
+//  private void updateCache(int key, FeatureStructureImpl value) {
+//    final int newIdx = cacheNewIndex;
+//    cacheNewIndex = (cacheNewIndex == (CACHE_SIZE - 1)) ? 0 : cacheNewIndex + 1;
+//    cacheFS[cacheNewIndex] = value;  // update this first to avoid putting in the same value multiple times
+//    cacheInt[cacheNewIndex] = key;
+//  }
+  
   public FeatureStructureImpl put(FeatureStructureImpl value) {
     if (!this.useCache) {
       return null;
     }
     final int key = value.getAddress();
+//    updateCache(key, value);
     final int hash = hashInt(key);
-    return getSubMap(hash).put(key,  value,  hash >>> concurrencyLevelBits);
+    return getSubMap(hash).put(key, value, hash >>> concurrencyLevelBits);
   }
     
   // The hash function is derived from murmurhash3 32 bit, which
@@ -329,4 +434,15 @@ public class JCasHashMap {
   public int getConcurrencyLevel() {
     return concurrencyLevel;
   }
+    
+//  private static final Thread dumpMeasurements = MEASURE_CACHE ? new Thread(new Runnable() {
+//    @Override
+//    public void run() {
+//      System.out.println(String.format("JCasHashMap cores = %d cache hits: %,d miss: %,d percent: %d%n",
+//          cores, 
+//          cacheHits.get(), cacheMisses.get(), (100 * cacheHits.get()) / (cacheHits.get() + cacheMisses.get())));
+//    }
+//  }) : null;
+//  
+//  static {if (MEASURE_CACHE) {Runtime.getRuntime().addShutdownHook(dumpMeasurements);}}
 }