You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/12/20 06:35:14 UTC

svn commit: r605811 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/

Author: stack
Date: Wed Dec 19 21:35:07 2007
New Revision: 605811

URL: http://svn.apache.org/viewvc?rev=605811&view=rev
Log:
HADOOP-2467 scanner truncates resultset when > 1 column families

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed Dec 19 21:35:07 2007
@@ -89,6 +89,7 @@
    HADOOP-2465 When split parent regions are cleaned up, not all the columns are
                deleted
    HADOOP-2468 TestRegionServerExit failed in Hadoop-Nightly #338
+   HADOOP-2467 scanner truncates resultset when > 1 column families
    
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Wed Dec 19 21:35:07 2007
@@ -592,7 +592,7 @@
     /** {@inheritDoc} */
     @Override
     public String toString() {
-      return "regionname: " + this.regionName.toString() + ", startKey: <" +
+      return "{regionname: " + this.regionName.toString() + ", startKey: <" +
         this.startKey.toString() + ">, server: " + this.server.toString() + "}";
     }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed Dec 19 21:35:07 2007
@@ -1113,6 +1113,7 @@
           continue;
         }
         storelist.add(stores.get(family));
+        
       }
       return new HScanner(cols, firstRow, timestamp,
         storelist.toArray(new HStore [storelist.size()]), filter);
@@ -1296,7 +1297,6 @@
     
     try {
       // find the HStore for the column family
-      LOG.info(family);
       HStore store = stores.get(HStoreKey.extractFamily(family));
       // find all the keys that match our criteria
       List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp), ALL_VERSIONS);
@@ -1422,8 +1422,8 @@
    * @throws IOException
    */
   private void checkColumn(Text columnName) throws IOException {
-    Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
-    if(! regionInfo.getTableDesc().hasFamily(family)) {
+    Text family = HStoreKey.extractFamily(columnName, true);
+    if (!regionInfo.getTableDesc().hasFamily(family)) {
       throw new IOException("Requested column family " + family 
           + " does not exist in HRegion " + regionInfo.getRegionName()
           + " for table " + regionInfo.getTableDesc().getName());
@@ -1529,14 +1529,21 @@
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
     HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
-        RowFilterInterface filter) throws IOException {
-
+        RowFilterInterface filter)
+    throws IOException {
       this.scanners = new HInternalScannerInterface[stores.length];
       try {
         for (int i = 0; i < stores.length; i++) {
-          scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter);
+          // TODO: The cols passed in here can include columns from other
+          // stores; add filter so only pertinent columns are passed.
+          //
+          // Also, if more than one store involved, need to replicate filters.
+          // At least WhileMatchRowFilter will mess up the scan if only
+          // one shared across many rows. See HADOOP-2467.
+          scanners[i] = stores[i].getScanner(timestamp, cols, firstRow,
+            (i > 0 && filter != null)?
+              (RowFilterInterface)Writables.clone(filter, conf): filter);
         }
-
       } catch(IOException e) {
         for (int i = 0; i < this.scanners.length; i++) {
           if(scanners[i] != null) {
@@ -1546,9 +1553,8 @@
         throw e;
       }
 
-//       Advance to the first key in each store.
-//       All results will match the required column-set and scanTime.
-      
+      // Advance to the first key in each store.
+      // All results will match the required column-set and scanTime.
       this.resultSets = new TreeMap[scanners.length];
       this.keys = new HStoreKey[scanners.length];
       for (int i = 0; i < scanners.length; i++) {
@@ -1616,7 +1622,6 @@
         // row label, then its timestamp is bad. We need to advance it.
         while ((scanners[i] != null) &&
             (keys[i].getRow().compareTo(chosenRow) <= 0)) {
-          
           resultSets[i].clear();
           if (!scanners[i].next(keys[i], resultSets[i])) {
             closeScanner(i);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed Dec 19 21:35:07 2007
@@ -1848,7 +1848,6 @@
         this.readers = new MapFile.Reader[storefiles.size()];
         
         // Most recent map file should be first
-        
         int i = readers.length - 1;
         for(HStoreFile curHSF: storefiles.values()) {
           readers[i--] = curHSF.getReader(fs, bloomFilter);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java Wed Dec 19 21:35:07 2007
@@ -815,6 +815,15 @@
       throws IOException {
         super(fs, dirName, conf);
         this.bloomFilter = filter;
+        // Force reading of the mapfile index by calling midKey.
+        // Reading the index will bring the index into memory over
+        // here on the client and then close the index file freeing
+        // up socket connection and resources in the datanode. 
+        // Usually, the first access on a MapFile.Reader will load the
+        // index force the issue in HStoreFile MapFiles because an
+        // access may not happen for some time; meantime we're
+        // using up datanode resources.  See HADOOP-2341.
+        midKey();
       }
       
       /** {@inheritDoc} */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Wed Dec 19 21:35:07 2007
@@ -27,6 +27,8 @@
  * A Key for a stored row
  */
 public class HStoreKey implements WritableComparable {
+  public static final char COLUMN_FAMILY_DELIMITER = ':';
+  
   // TODO: Move these utility methods elsewhere (To a Column class?).
   /**
    * Extracts the column family name from a column
@@ -83,7 +85,13 @@
   
   private static int getColonOffset(final Text col)
   throws InvalidColumnNameException {
-    int offset = col.find(":");
+    int offset = -1;
+    for (int i = 0; i < col.getLength(); i++) {
+      if (col.charAt(i) == COLUMN_FAMILY_DELIMITER) {
+        offset = i;
+        break;
+      }
+    }
     if(offset < 0) {
       throw new InvalidColumnNameException(col + " is missing the colon " +
         "family/qualifier separator");
@@ -294,23 +302,24 @@
 
   // Comparable
 
-  /** {@inheritDoc} */
   public int compareTo(Object o) {
     HStoreKey other = (HStoreKey) o;
     int result = this.row.compareTo(other.row);
-    if(result == 0) {
-      result = this.column.compareTo(other.column);
-      if(result == 0) {
-        // The below older timestamps sorting ahead of newer timestamps looks
-        // wrong but it is intentional.  This way, newer timestamps are first
-        // found when we iterate over a memcache and newer versions are the
-        // first we trip over when reading from a store file.
-        if(this.timestamp < other.timestamp) {
-          result = 1;
-        } else if(this.timestamp > other.timestamp) {
-          result = -1;
-        }
-      }
+    if (result != 0) {
+      return result;
+    }
+    result = this.column.compareTo(other.column);
+    if (result != 0) {
+      return result;
+    }
+    // The below older timestamps sorting ahead of newer timestamps looks
+    // wrong but it is intentional. This way, newer timestamps are first
+    // found when we iterate over a memcache and newer versions are the
+    // first we trip over when reading from a store file.
+    if (this.timestamp < other.timestamp) {
+      result = 1;
+    } else if (this.timestamp > other.timestamp) {
+      result = -1;
     }
     return result;
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Wed Dec 19 21:35:07 2007
@@ -426,7 +426,7 @@
    */
   public HScannerInterface obtainScanner(Text[] columns, Text startRow)
   throws IOException {
-    return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
+    return obtainScanner(columns, startRow, HConstants.LATEST_TIMESTAMP, null);
   }
   
   /** 
@@ -466,7 +466,7 @@
   public HScannerInterface obtainScanner(Text[] columns, Text startRow,
       RowFilterInterface filter)
   throws IOException { 
-    return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
+    return obtainScanner(columns, startRow, HConstants.LATEST_TIMESTAMP, filter);
   }
 
   /** 
@@ -490,7 +490,7 @@
       final Text startRow, final Text stopRow)
   throws IOException {
     return obtainScanner(columns, startRow, stopRow,
-      System.currentTimeMillis());
+      HConstants.LATEST_TIMESTAMP);
   }
 
   /** 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java Wed Dec 19 21:35:07 2007
@@ -27,7 +27,10 @@
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 
@@ -90,7 +93,25 @@
       in.close();
     }
   }
-  
+
+  /**
+   * Make a copy of a writable object using serialization to a buffer.
+   * Copied from WritableUtils only <code>conf</code> type is Configurable
+   * rather than JobConf (Doesn't need to be JobConf -- HADOOP-2469).
+   * @param orig The object to copy
+   * @return The copied object
+   */
+  public static Writable clone(Writable orig, Configuration conf) {
+    try {
+      Writable newInst =
+        (Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
+      WritableUtils.cloneInto(newInst, orig);
+      return newInst;
+    } catch (IOException e) {
+      throw new RuntimeException("Error writing/reading clone buffer", e);
+    }
+  }
+
   /**
    * @param bytes
    * @return A HRegionInfo instance built out of passed <code>bytes</code>.

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Wed Dec 19 21:35:07 2007
@@ -77,8 +77,7 @@
   private void init() {
     conf = new HBaseConfiguration();
     try {
-      START_KEY =
-        new String(START_KEY_BYTES, HConstants.UTF8_ENCODING) + PUNCTUATION;
+      START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_ENCODING) + PUNCTUATION;
     } catch (UnsupportedEncodingException e) {
       fail();
     }
@@ -125,10 +124,23 @@
           null), fs, conf, info, null, null);
   }
   
+  /**
+   * Create a table of name <code>name</code> with {@link COLUMNS} for
+   * families.
+   * @param name Name to give table.
+   * @return Column descriptor.
+   */
   protected HTableDescriptor createTableDescriptor(final String name) {
     return createTableDescriptor(name, MAXVERSIONS);
   }
   
+  /**
+   * Create a table of name <code>name</code> with {@link COLUMNS} for
+   * families.
+   * @param name Name to give table.
+   * @param versions How many versions to allow per column.
+   * @return Column descriptor.
+   */
   protected HTableDescriptor createTableDescriptor(final String name,
       final int versions) {
     HTableDescriptor htd = new HTableDescriptor(name);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?rev=605811&r1=605810&r2=605811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Wed Dec 19 21:35:07 2007
@@ -72,6 +72,54 @@
   }
 
   /**
+   * Test for HADOOP-2467 fix.  If scanning more than one column family,
+   * filters such as the {@line WhileMatchRowFilter} could prematurely
+   * shutdown scanning if one of the stores ran started returned filter = true.
+   * @throws MasterNotRunningException
+   * @throws IOException
+   */
+  public void testScanningMultipleFamiliesOfDifferentVintage()
+  throws MasterNotRunningException, IOException {
+    Text tableName = new Text(getName());
+    final Text [] families = createTable(new HBaseAdmin(this.conf), tableName);
+    HTable table = new HTable(this.conf, tableName);
+    HScannerInterface scanner = null;
+    try {
+      long time = System.currentTimeMillis();
+      LOG.info("Current time " + time);
+      for (int i = 0; i < families.length; i++) {
+        final byte [] lastKey = new byte [] {'a', 'a', (byte)('b' + i)};
+        Incommon inc = new HTableIncommon(table);
+        addContent(inc, families[i].toString(),
+          START_KEY_BYTES, new Text(lastKey), time + (1000 * i));
+        // Add in to the first store a record that is in excess of the stop
+        // row specified below setting up the scanner filter.  Add 'bbb'.
+        // Use a stop filter of 'aad'.  The store scanner going to 'bbb' was
+        // flipping the switch in StopRowFilter stopping us returning all
+        // of the rest of the other store content.
+        if (i == 0) {
+          long id = inc.startBatchUpdate(new Text("bbb"));
+          inc.put(id, families[0], "bbb".getBytes());
+          inc.commit(id);
+        }
+      }
+      RowFilterInterface f =
+        new WhileMatchRowFilter(new StopRowFilter(new Text("aad")));
+      scanner = table.obtainScanner(families, HConstants.EMPTY_START_ROW,
+        HConstants.LATEST_TIMESTAMP, f);
+      int count = 0;
+      for (Map.Entry<HStoreKey, SortedMap<Text, byte []>> e: scanner) {
+        count++;
+      }
+      // Should get back 3 rows: aaa, aab, and aac.
+      assertEquals(count, 3);
+    } finally {
+      scanner.close();
+      table.close();
+    }
+  }
+
+  /**
    * @throws Exception
    */
   public void testStopRow() throws Exception {