You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/12/08 07:54:33 UTC

svn commit: r602334 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/

Author: jimk
Date: Fri Dec  7 22:54:31 2007
New Revision: 602334

URL: http://svn.apache.org/viewvc?rev=602334&view=rev
Log:
HADOOP-2350 Scanner api returns null row names, or skips row names if different column families do not have entries for some rows

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Dec  7 22:54:31 2007
@@ -64,6 +64,8 @@
    HADOOP-2338 Fix NullPointerException in master server.
    HADOOP-2380 REST servlet throws NPE when any value node has an empty string
                (Bryan Duxbury via Stack)
+   HADOOP-2350 Scanner api returns null row names, or skips row names if
+               different column families do not have entries for some rows
 
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Fri Dec  7 22:54:31 2007
@@ -332,7 +332,7 @@
       HRegion root =
         new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
 
-      HInternalScannerInterface rootScanner =
+      HScannerInterface rootScanner =
         root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
       
       try {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Fri Dec  7 22:54:31 2007
@@ -1088,7 +1088,7 @@
    * @return HScannerInterface
    * @throws IOException
    */
-  public HInternalScannerInterface getScanner(Text[] cols, Text firstRow,
+  public HScannerInterface getScanner(Text[] cols, Text firstRow,
       long timestamp, RowFilterInterface filter) throws IOException {
     lock.readLock().lock();
     try {
@@ -1485,33 +1485,21 @@
   /**
    * HScanner is an iterator through a bunch of rows in an HRegion.
    */
-  private class HScanner implements HInternalScannerInterface {
+  private class HScanner implements HScannerInterface {
     private HInternalScannerInterface[] scanners;
-    private boolean wildcardMatch = false;
-    private boolean multipleMatchers = false;
+    private TreeMap<Text, byte []>[] resultSets;
+    private HStoreKey[] keys;
 
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
     HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
         RowFilterInterface filter) throws IOException {
-      this.scanners = new HInternalScannerInterface[stores.length];
 
-//       Advance to the first key in each store.
-//       All results will match the required column-set and scanTime.
-      
+      this.scanners = new HInternalScannerInterface[stores.length];
       try {
         for (int i = 0; i < stores.length; i++) {
-          HInternalScannerInterface scanner =
-          scanners[i] =
-            stores[i].getScanner(timestamp, cols, firstRow, filter);
-          
-            if (scanner.isWildcardScanner()) {
-              this.wildcardMatch = true;
-            }
-            if (scanner.isMultipleMatchScanner()) {
-              this.multipleMatchers = true;
-            }
-          }
+          scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter);
+        }
 
       } catch(IOException e) {
         for (int i = 0; i < this.scanners.length; i++) {
@@ -1521,35 +1509,100 @@
         }
         throw e;
       }
+
+//       Advance to the first key in each store.
+//       All results will match the required column-set and scanTime.
+      
+      this.resultSets = new TreeMap[scanners.length];
+      this.keys = new HStoreKey[scanners.length];
+      for (int i = 0; i < scanners.length; i++) {
+        keys[i] = new HStoreKey();
+        resultSets[i] = new TreeMap<Text, byte []>();
+        if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+          closeScanner(i);
+        }
+      }
+
       // As we have now successfully completed initialization, increment the
       // activeScanner count.
       activeScannerCount.incrementAndGet();
     }
 
-    /** @return true if the scanner is a wild card scanner */
-    public boolean isWildcardScanner() {
-      return wildcardMatch;
-    }
-
-    /** @return true if the scanner is a multiple match scanner */
-    public boolean isMultipleMatchScanner() {
-      return multipleMatchers;
-    }
-
     /** {@inheritDoc} */
     public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
     throws IOException {
-      boolean haveResults = false;
+      boolean moreToFollow = false;
+
+      // Find the lowest-possible key.
+
+      Text chosenRow = null;
+      long chosenTimestamp = -1;
+      for (int i = 0; i < this.keys.length; i++) {
+        if (scanners[i] != null &&
+            (chosenRow == null ||
+                (keys[i].getRow().compareTo(chosenRow) < 0) ||
+                ((keys[i].getRow().compareTo(chosenRow) == 0) &&
+                    (keys[i].getTimestamp() > chosenTimestamp)))) {
+          chosenRow = new Text(keys[i].getRow());
+          chosenTimestamp = keys[i].getTimestamp();
+        }
+      }
+
+      // Store the key and results for each sub-scanner. Merge them as
+      // appropriate.
+      if (chosenTimestamp >= 0) {
+        // Here we are setting the passed in key with current row+timestamp
+        key.setRow(chosenRow);
+        key.setVersion(chosenTimestamp);
+        key.setColumn(HConstants.EMPTY_TEXT);
+
+        for (int i = 0; i < scanners.length; i++) {
+          if (scanners[i] != null && keys[i].getRow().compareTo(chosenRow) == 0) {
+            // NOTE: We used to do results.putAll(resultSets[i]);
+            // but this had the effect of overwriting newer
+            // values with older ones. So now we only insert
+            // a result if the map does not contain the key.
+            for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
+              if (!results.containsKey(e.getKey())) {
+                results.put(e.getKey(), e.getValue());
+              }
+            }
+            resultSets[i].clear();
+            if (!scanners[i].next(keys[i], resultSets[i])) {
+              closeScanner(i);
+            }
+          }
+        }
+      }
+
       for (int i = 0; i < scanners.length; i++) {
-        if (scanners[i] != null) {
-          if (scanners[i].next(key, results)) {
-            haveResults = true;
-          } else {
+        // If the current scanner is non-null AND has a lower-or-equal
+        // row label, then its timestamp is bad. We need to advance it.
+        while ((scanners[i] != null) &&
+            (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+          
+          resultSets[i].clear();
+          if (!scanners[i].next(keys[i], resultSets[i])) {
+            closeScanner(i);
+          }
+        }
+      }
+
+      moreToFollow = chosenTimestamp >= 0;
+      if (results == null || results.size() <= 0) {
+        // If we got no results, then there is no more to follow.
+        moreToFollow = false;
+      }
+
+      // Make sure scanners closed if no more results
+      if (!moreToFollow) {
+        for (int i = 0; i < scanners.length; i++) {
+          if (null != scanners[i]) {
             closeScanner(i);
           }
         }
       }
-      return haveResults;
+      return moreToFollow;
     }
 
     
@@ -1563,6 +1616,8 @@
         }
       } finally {
         scanners[i] = null;
+        resultSets[i] = null;
+        keys[i] = null;
       }
     }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Dec  7 22:54:31 2007
@@ -1373,7 +1373,7 @@
     requestCount.incrementAndGet();
     try {
       String scannerName = String.valueOf(scannerId);
-      HInternalScannerInterface s = scanners.get(scannerName);
+      HScannerInterface s = scanners.get(scannerName);
       if (s == null) {
         throw new UnknownScannerException("Name: " + scannerName);
       }
@@ -1433,7 +1433,7 @@
     try {
       HRegion r = getRegion(regionName);
       long scannerId = -1L;
-      HInternalScannerInterface s =
+      HScannerInterface s =
         r.getScanner(cols, firstRow, timestamp, filter);
       scannerId = rand.nextLong();
       String scannerName = String.valueOf(scannerId);
@@ -1457,7 +1457,7 @@
     requestCount.incrementAndGet();
     try {
       String scannerName = String.valueOf(scannerId);
-      HInternalScannerInterface s = null;
+      HScannerInterface s = null;
       synchronized(scanners) {
         s = scanners.remove(scannerName);
       }
@@ -1472,9 +1472,8 @@
     }
   }
 
-  Map<String, HInternalScannerInterface> scanners =
-    Collections.synchronizedMap(new HashMap<String,
-      HInternalScannerInterface>());
+  Map<String, HScannerInterface> scanners =
+    Collections.synchronizedMap(new HashMap<String, HScannerInterface>());
 
   /** 
    * Instantiated as a scanner lease.
@@ -1490,7 +1489,7 @@
     /** {@inheritDoc} */
     public void leaseExpired() {
       LOG.info("Scanner " + this.scannerName + " lease expired");
-      HInternalScannerInterface s = null;
+      HScannerInterface s = null;
       synchronized(scanners) {
         s = scanners.remove(this.scannerName);
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java Fri Dec  7 22:54:31 2007
@@ -283,7 +283,7 @@
     
     startTime = System.currentTimeMillis();
 
-    HInternalScannerInterface s =
+    HScannerInterface s =
       r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
     int numFetched = 0;
     try {
@@ -630,7 +630,7 @@
     
     long startTime = System.currentTimeMillis();
     
-    HInternalScannerInterface s =
+    HScannerInterface s =
       r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
 
     try {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java Fri Dec  7 22:54:31 2007
@@ -69,7 +69,7 @@
   private void scan(boolean validateStartcode, String serverName)
       throws IOException {
     
-    HInternalScannerInterface scanner = null;
+    HScannerInterface scanner = null;
     TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
     HStoreKey key = new HStoreKey();
 
@@ -108,7 +108,7 @@
         }
 
       } finally {
-        HInternalScannerInterface s = scanner;
+        HScannerInterface s = scanner;
         scanner = null;
         if(s != null) {
           s.close();

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java?rev=602334&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java Fri Dec  7 22:54:31 2007
@@ -0,0 +1,161 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+
+/** test the scanner API at all levels */
+public class TestScannerAPI extends HBaseClusterTestCase {
+  private final Text[] columns = new Text[] {
+    new Text("a:"),
+    new Text("b:")
+  };
+  private final Text startRow = new Text("0");
+
+  private final TreeMap<Text, SortedMap<Text, byte[]>> values =
+    new TreeMap<Text, SortedMap<Text, byte[]>>();
+  
+  /**
+   * @throws Exception
+   */
+  public TestScannerAPI() throws Exception {
+    super();
+    try {
+      TreeMap<Text, byte[]> columns = new TreeMap<Text, byte[]>();
+      columns.put(new Text("a:1"), "1".getBytes(HConstants.UTF8_ENCODING));
+      values.put(new Text("1"), columns);
+      columns = new TreeMap<Text, byte[]>();
+      columns.put(new Text("a:2"), "2".getBytes(HConstants.UTF8_ENCODING));
+      columns.put(new Text("b:2"), "2".getBytes(HConstants.UTF8_ENCODING));
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+  
+  /**
+   * @throws IOException
+   */
+  public void testApi() throws IOException {
+    final String tableName = getName();
+
+    // Create table
+    
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+    for (int i = 0; i < columns.length; i++) {
+      tableDesc.addFamily(new HColumnDescriptor(columns[i].toString()));
+    }
+    admin.createTable(tableDesc);
+
+    // Insert values
+    
+    HTable table = new HTable(conf, new Text(getName()));
+
+    for (Map.Entry<Text, SortedMap<Text, byte[]>> row: values.entrySet()) {
+      long lockid = table.startUpdate(row.getKey());
+      for (Map.Entry<Text, byte[]> val: row.getValue().entrySet()) {
+        table.put(lockid, val.getKey(), val.getValue());
+      }
+      table.commit(lockid);
+    }
+
+    HRegion region = null;
+    try {
+      SortedMap<Text, HRegion> regions =
+        cluster.getRegionThreads().get(0).getRegionServer().getOnlineRegions();
+      for (Map.Entry<Text, HRegion> e: regions.entrySet()) {
+        if (!e.getValue().getRegionInfo().isMetaRegion()) {
+          region = e.getValue();
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      IOException iox = new IOException("error finding region");
+      iox.initCause(e);
+      throw iox;
+    }
+    @SuppressWarnings("null")
+    HScannerInterface scanner = 
+      region.getScanner(columns, startRow, System.currentTimeMillis(), null);
+    try {
+      verify(scanner);
+    } finally {
+      scanner.close();
+    }
+    
+    scanner = table.obtainScanner(columns, startRow);
+    try {
+      verify(scanner);
+    } finally {
+      scanner.close();
+    }
+    scanner = table.obtainScanner(columns, startRow);
+    try {
+      for (Iterator<Map.Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator =
+        scanner.iterator();
+      iterator.hasNext();
+      ) {
+        Map.Entry<HStoreKey, SortedMap<Text, byte[]>> row = iterator.next();
+        HStoreKey key = row.getKey();
+        assertTrue("row key", values.containsKey(key.getRow()));
+
+        SortedMap<Text, byte[]> results = row.getValue();
+        SortedMap<Text, byte[]> columnValues = values.get(key.getRow());
+        assertEquals(columnValues.size(), results.size());
+        for (Map.Entry<Text, byte[]> e: columnValues.entrySet()) {
+          Text column = e.getKey();
+          assertTrue("column", results.containsKey(column));
+          assertTrue("value", Arrays.equals(columnValues.get(column),
+              results.get(column)));
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+  
+  private void verify(HScannerInterface scanner) throws IOException {
+    HStoreKey key = new HStoreKey();
+    SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+    while (scanner.next(key, results)) {
+      Text row = key.getRow();
+      assertTrue("row key", values.containsKey(row));
+      
+      SortedMap<Text, byte[]> columnValues = values.get(row);
+      assertEquals(columnValues.size(), results.size());
+      for (Map.Entry<Text, byte[]> e: columnValues.entrySet()) {
+        Text column = e.getKey();
+        assertTrue("column", results.containsKey(column));
+        assertTrue("value", Arrays.equals(columnValues.get(column),
+            results.get(column)));
+      }
+      results.clear();
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Fri Dec  7 22:54:31 2007
@@ -228,7 +228,7 @@
       final Text firstValue)
   throws IOException {
     Text [] cols = new Text[] {new Text(column)};
-    HInternalScannerInterface s = r.getScanner(cols,
+    HScannerInterface s = r.getScanner(cols,
       HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
     try {
       HStoreKey curKey = new HStoreKey();