You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/09/15 23:27:11 UTC

svn commit: r575986 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/

Author: jimk
Date: Sat Sep 15 14:27:10 2007
New Revision: 575986

URL: http://svn.apache.org/viewvc?rev=575986&view=rev
Log:
HADOOP-1888 NullPointerException in HMemcacheScanner (reprise)

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575986&r1=575985&r2=575986&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 14:27:10 2007
@@ -47,7 +47,7 @@
     HADOOP-1847 Many HBase tests do not fail well. (phase 2)
     HADOOP-1870 Once file system failure has been detected, don't check it again
                 and get on with shutting down the hbase cluster.
-    HADOOP-1888 NullPointerException in HMemcacheScanner
+    HADOOP-1888 NullPointerException in HMemcacheScanner (reprise)
     HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
                 to disk.
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=575986&r1=575985&r2=575986&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Sat Sep 15 14:27:10 2007
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -43,10 +44,10 @@
   // Note that since these structures are always accessed with a lock held,
   // no additional synchronization is required.
   
-  TreeMap<HStoreKey, byte []> memcache = new TreeMap<HStoreKey, byte []>();
-  final ArrayList<TreeMap<HStoreKey, byte []>> history =
-    new ArrayList<TreeMap<HStoreKey, byte []>>();
-  TreeMap<HStoreKey, byte []> snapshot = null;
+  volatile SortedMap<HStoreKey, byte []> memcache;
+  List<SortedMap<HStoreKey, byte []>> history =
+    Collections.synchronizedList(new ArrayList<SortedMap<HStoreKey, byte []>>());
+  volatile SortedMap<HStoreKey, byte []> snapshot = null;
 
   final HLocking lock = new HLocking();
   
@@ -62,14 +63,16 @@
    */
   public HMemcache() {
     super();
+    memcache  =
+      Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
   }
 
   /** represents the state of the memcache at a specified point in time */
   static class Snapshot {
-    final TreeMap<HStoreKey, byte []> memcacheSnapshot;
+    final SortedMap<HStoreKey, byte []> memcacheSnapshot;
     final long sequenceId;
     
-    Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) {
+    Snapshot(final SortedMap<HStoreKey, byte[]> memcache, final Long i) {
       super();
       this.memcacheSnapshot = memcache;
       this.sequenceId = i.longValue();
@@ -103,8 +106,11 @@
         new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
       // From here on, any failure is catastrophic requiring replay of hlog
       this.snapshot = memcache;
-      history.add(memcache);
-      memcache = new TreeMap<HStoreKey, byte []>();
+      synchronized (history) {
+        history.add(memcache);
+      }
+      memcache =
+        Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
       // Reset size of this memcache.
       this.size.set(0);
       return retval;
@@ -126,14 +132,8 @@
       if(snapshot == null) {
         throw new IOException("Snapshot not present!");
       }
-      for (Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator(); 
-        it.hasNext(); ) {
-        
-        TreeMap<HStoreKey, byte []> cur = it.next();
-        if (snapshot == cur) {
-          it.remove();
-          break;
-        }
+      synchronized (history) {
+        history.remove(snapshot);
       }
       this.snapshot = null;
     } finally {
@@ -182,12 +182,14 @@
     this.lock.obtainReadLock();
     try {
       ArrayList<byte []> results = get(memcache, key, numVersions);
-      for (int i = history.size() - 1; i >= 0; i--) {
-        if (numVersions > 0 && results.size() >= numVersions) {
-          break;
+      synchronized (history) {
+        for (int i = history.size() - 1; i >= 0; i--) {
+          if (numVersions > 0 && results.size() >= numVersions) {
+            break;
+          }
+          results.addAll(results.size(),
+              get(history.get(i), key, numVersions - results.size()));
         }
-        results.addAll(results.size(),
-            get(history.get(i), key, numVersions - results.size()));
       }
       return (results.size() == 0) ? null :
         ImmutableBytesWritable.toArray(results);
@@ -210,9 +212,11 @@
     this.lock.obtainReadLock();
     try {
       internalGetFull(memcache, key, results);
-      for (int i = history.size() - 1; i >= 0; i--) {
-        TreeMap<HStoreKey, byte []> cur = history.get(i);
-        internalGetFull(cur, key, results);
+      synchronized (history) {
+        for (int i = history.size() - 1; i >= 0; i--) {
+          SortedMap<HStoreKey, byte []> cur = history.get(i);
+          internalGetFull(cur, key, results);
+        }
       }
       return results;
       
@@ -221,7 +225,7 @@
     }
   }
   
-  void internalGetFull(TreeMap<HStoreKey, byte []> map, HStoreKey key, 
+  void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key, 
       TreeMap<Text, byte []> results) {
     SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
     for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
@@ -252,7 +256,7 @@
    * @return Ordered list of items found in passed <code>map</code>.  If no
    * matching values, returns an empty list (does not return null).
    */
-  ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
+  ArrayList<byte []> get(final SortedMap<HStoreKey, byte []> map,
       final HStoreKey key, final int numVersions) {
     ArrayList<byte []> result = new ArrayList<byte []>();
     // TODO: If get is of a particular version -- numVersions == 1 -- we
@@ -289,10 +293,12 @@
     this.lock.obtainReadLock();
     try {
       List<HStoreKey> results = getKeys(this.memcache, origin, versions);
-      for (int i = history.size() - 1; i >= 0; i--) {
-        results.addAll(results.size(), getKeys(history.get(i), origin,
-            versions == HConstants.ALL_VERSIONS ? versions :
-              (versions - results.size())));
+      synchronized (history) {
+        for (int i = history.size() - 1; i >= 0; i--) {
+          results.addAll(results.size(), getKeys(history.get(i), origin,
+              versions == HConstants.ALL_VERSIONS ? versions :
+                (versions - results.size())));
+        }
       }
       return results;
     } finally {
@@ -308,7 +314,7 @@
    * equal or older timestamp.  If no keys, returns an empty List. Does not
    * return null.
    */
-  private List<HStoreKey> getKeys(final TreeMap<HStoreKey, byte []> map,
+  private List<HStoreKey> getKeys(final SortedMap<HStoreKey, byte []> map,
       final HStoreKey origin, final int versions) {
     List<HStoreKey> result = new ArrayList<HStoreKey>();
     SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
@@ -360,7 +366,7 @@
   //////////////////////////////////////////////////////////////////////////////
 
   class HMemcacheScanner extends HAbstractScanner {
-    final TreeMap<HStoreKey, byte []> backingMaps[];
+    SortedMap<HStoreKey, byte []> backingMaps[];
     final Iterator<HStoreKey> keyIterators[];
 
     @SuppressWarnings("unchecked")
@@ -370,14 +376,16 @@
       super(timestamp, targetCols);
       lock.obtainReadLock();
       try {
-        this.backingMaps = new TreeMap[history.size() + 1];
+        synchronized (history) {
+          this.backingMaps = new SortedMap[history.size() + 1];
 
-        // Note that since we iterate through the backing maps from 0 to n, we
-        // need to put the memcache first, the newest history second, ..., etc.
+          // Note that since we iterate through the backing maps from 0 to n, we
+          // need to put the memcache first, the newest history second, ..., etc.
 
-        backingMaps[0] = memcache;
-        for (int i = history.size() - 1; i > 0; i--) {
-          backingMaps[i + 1] = history.get(i);
+          backingMaps[0] = memcache;
+          for (int i = history.size() - 1; i >= 0; i--) {
+            backingMaps[i + 1] = history.get(i);
+          }
         }
 
         this.keyIterators = new Iterator[backingMaps.length];
@@ -388,9 +396,13 @@
         
         HStoreKey firstKey = new HStoreKey(firstRow);
         for (int i = 0; i < backingMaps.length; i++) {
-          keyIterators[i] = firstRow.getLength() != 0 ?
-              backingMaps[i].tailMap(firstKey).keySet().iterator() :
-                backingMaps[i].keySet().iterator();
+          if (firstRow != null && firstRow.getLength() != 0) {
+            keyIterators[i] =
+              backingMaps[i].tailMap(firstKey).keySet().iterator();
+            
+          } else {
+            keyIterators[i] = backingMaps[i].keySet().iterator();
+          }
 
           while (getNext(i)) {
             if (!findFirstRow(i, firstRow)) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575986&r1=575985&r2=575986&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 14:27:10 2007
@@ -1615,6 +1615,7 @@
       return multipleMatchers;
     }
 
+    /** {@inheritDoc} */
     public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
     throws IOException {
       // Filtered flag is set by filters.  If a cell has been 'filtered out'

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=575986&r1=575985&r2=575986&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Sat Sep 15 14:27:10 2007
@@ -31,6 +31,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Map.Entry;
@@ -439,13 +440,13 @@
    * @param logCacheFlushId flush sequence number
    * @throws IOException
    */
-  void flushCache(final TreeMap<HStoreKey, byte []> inputCache,
+  void flushCache(final SortedMap<HStoreKey, byte []> inputCache,
     final long logCacheFlushId)
   throws IOException {
     flushCacheHelper(inputCache, logCacheFlushId, true);
   }
   
-  void flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
+  void flushCacheHelper(SortedMap<HStoreKey, byte []> inputCache,
       long logCacheFlushId, boolean addToAvailableMaps)
   throws IOException {
     synchronized(flushLock) {
@@ -1123,7 +1124,7 @@
    * @param key
    * @param numVersions Number of versions to fetch.  Must be > 0.
    * @param memcache Checked for deletions
-   * @return
+   * @return values for the specified versions
    * @throws IOException
    */
   byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache)
@@ -1171,10 +1172,11 @@
               break;
             }
           }
-          while ((readval = new ImmutableBytesWritable()) != null &&
+          for (readval = new ImmutableBytesWritable();
               map.next(readkey, readval) &&
               readkey.matchesRowCol(key) &&
-              !hasEnoughVersions(numVersions, results)) {
+              !hasEnoughVersions(numVersions, results);
+              readval = new ImmutableBytesWritable()) {
             if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
               results.add(readval.get());
             }
@@ -1212,10 +1214,11 @@
    * @throws IOException
    */
   List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys,
-      final int versions)
-  throws IOException {
-    if (allKeys == null) {
-      allKeys = new ArrayList<HStoreKey>();
+      final int versions) throws IOException {
+    
+    List<HStoreKey> keys = allKeys;
+    if (keys == null) {
+      keys = new ArrayList<HStoreKey>();
     }
     // This code below is very close to the body of the get method.
     this.lock.obtainReadLock();
@@ -1238,23 +1241,24 @@
             continue;
           }
           if (!isDeleted(readkey, readval.get(), null, null) &&
-              !allKeys.contains(readkey)) {
-            allKeys.add(new HStoreKey(readkey));
+              !keys.contains(readkey)) {
+            keys.add(new HStoreKey(readkey));
           }
-          while ((readval = new ImmutableBytesWritable()) != null &&
+          for (readval = new ImmutableBytesWritable();
               map.next(readkey, readval) &&
-              readkey.matchesRowCol(origin)) {
+              readkey.matchesRowCol(origin);
+              readval = new ImmutableBytesWritable()) {
             if (!isDeleted(readkey, readval.get(), null, null) &&
-                !allKeys.contains(readkey)) {
-              allKeys.add(new HStoreKey(readkey));
-              if (versions != ALL_VERSIONS && allKeys.size() >= versions) {
+                !keys.contains(readkey)) {
+              keys.add(new HStoreKey(readkey));
+              if (versions != ALL_VERSIONS && keys.size() >= versions) {
                 break;
               }
             }
           }
         }
       }
-      return allKeys;
+      return keys;
     } finally {
       this.lock.releaseReadLock();
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=575986&r1=575985&r2=575986&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Sat Sep 15 14:27:10 2007
@@ -50,7 +50,8 @@
   private FileSystem fs;
   private Path parentdir;
   private MasterThread masterThread = null;
-  ArrayList<RegionServerThread> regionThreads;
+  ArrayList<RegionServerThread> regionThreads =
+    new ArrayList<RegionServerThread>();
   private boolean deleteOnExit = true;
 
   /**
@@ -125,7 +126,7 @@
       this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
       fs.mkdirs(parentdir);
       this.masterThread = startMaster(this.conf);
-      this.regionThreads = startRegionServers(this.conf, nRegionNodes);
+      this.regionThreads.addAll(startRegionServers(this.conf, nRegionNodes));
     } catch(IOException e) {
       shutdown();
       throw e;
@@ -357,17 +358,15 @@
     if(masterThread != null) {
       masterThread.getMaster().shutdown();
     }
-    if (regionServerThreads != null) {
-      synchronized(regionServerThreads) {
-        if (regionServerThreads != null) {
-          for(Thread t: regionServerThreads) {
-            if (t.isAlive()) {
-              try {
-                t.join();
-              } catch (InterruptedException e) {
-                // continue
-              }
-            }
+    // regionServerThreads can never be null because they are initialized when
+    // the class is constructed.
+    synchronized(regionServerThreads) {
+      for(Thread t: regionServerThreads) {
+        if (t.isAlive()) {
+          try {
+            t.join();
+          } catch (InterruptedException e) {
+            // continue
           }
         }
       }
@@ -381,8 +380,7 @@
     }
     LOG.info("Shutdown " +
       ((masterThread != null)? masterThread.getName(): "0 masters") + " " +
-      ((regionServerThreads == null)? 0: regionServerThreads.size()) +
-      " region server(s)");
+      regionServerThreads.size() + " region server(s)");
   }
   
   void shutdown() {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java?rev=575986&r1=575985&r2=575986&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java Sat Sep 15 14:27:10 2007
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 
 import junit.framework.TestCase;
@@ -97,7 +98,7 @@
     
     // Save off old state.
     int oldHistorySize = hmc.history.size();
-    TreeMap<HStoreKey, byte []> oldMemcache = hmc.memcache;
+    SortedMap<HStoreKey, byte []> oldMemcache = hmc.memcache;
     // Run snapshot.
     Snapshot s = hmc.snapshotMemcacheForLog(log);
     // Make some assertions about what just happened.