You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/08/01 22:10:20 UTC

svn commit: r561935 [2/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?view=auto&rev=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Wed Aug  1 13:10:11 2007
@@ -0,0 +1,850 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Used to communicate with a single HBase table
+ */
+public class HTable implements HConstants {
+  protected final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+  protected final HConnection connection;
+  protected final Text tableName;
+  protected final long pause;
+  protected final int numRetries;
+  protected Random rand;
+  protected volatile SortedMap<Text, HRegionLocation> tableServers;
+  protected BatchUpdate batch;
+  
+  // For row mutation operations
+  
+  protected volatile long currentLockId;
+  protected volatile Text currentRegion;
+  protected volatile HRegionInterface currentServer;
+  protected volatile long clientid;
+  
+  protected volatile boolean closed;
+
+  /**
+   * Creates an object to access a HBase table
+   * 
+   * @param conf configuration object
+   * @param tableName name of the table
+   * @throws IOException
+   */
+  public HTable(Configuration conf, Text tableName) throws IOException {
+    closed = true;
+    this.connection = HConnectionManager.getConnection(conf);
+    this.tableName = tableName;
+    this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
+    this.numRetries = conf.getInt("hbase.client.retries.number", 5);
+    this.rand = new Random();
+    tableServers = connection.getTableServers(tableName);
+    this.batch = null;
+    this.currentLockId = -1L;
+    closed = false;
+  }
+
+  /**
+   * Find region location hosting passed row using cached info
+   * @param row Row to find.
+   * @return Location of row.
+   */
+  HRegionLocation getRegionLocation(Text row) {
+    if (this.tableServers == null) {
+      throw new IllegalStateException("Must open table first");
+    }
+    
+    // Only one server will have the row we are looking for
+    Text serverKey = (this.tableServers.containsKey(row)) ?
+        row : this.tableServers.headMap(row).lastKey();
+    return this.tableServers.get(serverKey);
+  }
+  
+  /**
+   * Verifies that no update is in progress
+   */
+  public synchronized void checkUpdateInProgress() {
+    if (batch != null || currentLockId != -1L) {
+      throw new IllegalStateException("update in progress");
+    }
+  }
+
+  /**
+   * Gets the starting row key for every region in the currently open table
+   * @return Array of region starting row keys
+   */
+  public Text[] getStartKeys() {
+    if (closed) {
+      throw new IllegalStateException("table is closed");
+    }
+    Text[] keys = new Text[tableServers.size()];
+    int i = 0;
+    for(Text key: tableServers.keySet()){
+      keys[i++] = key;
+    }
+    return keys;
+  }
+  
+  /** 
+   * Get a single value for the specified row and column
+   *
+   * @param row row key
+   * @param column column name
+   * @return value for specified row/column
+   * @throws IOException
+   */
+  public byte[] get(Text row, Text column) throws IOException {
+    byte [] value = null;
+    for(int tries = 0; tries < numRetries; tries++) {
+      HRegionLocation r = getRegionLocation(row);
+      HRegionInterface server =
+        connection.getHRegionConnection(r.getServerAddress());
+      
+      try {
+        value = server.get(r.getRegionInfo().getRegionName(), row, column);
+        break;
+        
+      } catch (IOException e) {
+        if (tries == numRetries - 1) {
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          }
+          throw e;
+        }
+        tableServers = connection.reloadTableServers(tableName);
+      }
+      try {
+        Thread.sleep(this.pause);
+        
+      } catch (InterruptedException x) {
+        // continue
+      }
+    }
+    return value;
+  }
+ 
+  /** 
+   * Get the specified number of versions of the specified row and column
+   * 
+   * @param row         - row key
+   * @param column      - column name
+   * @param numVersions - number of versions to retrieve
+   * @return            - array byte values
+   * @throws IOException
+   */
+  public byte[][] get(Text row, Text column, int numVersions) throws IOException {
+    byte [][] values = null;
+    for (int tries = 0; tries < numRetries; tries++) {
+      HRegionLocation r = getRegionLocation(row);
+      HRegionInterface server = 
+        connection.getHRegionConnection(r.getServerAddress());
+      
+      try {
+        values = server.get(r.getRegionInfo().getRegionName(), row, column,
+            numVersions);
+        
+        break;
+        
+      } catch (IOException e) {
+        if (tries == numRetries - 1) {
+          // No more tries
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          }
+          throw e;
+        }
+        tableServers = connection.reloadTableServers(tableName);
+      }
+      try {
+        Thread.sleep(this.pause);
+        
+      } catch (InterruptedException x) {
+        // continue
+      }
+    }
+
+    if (values != null) {
+      ArrayList<byte[]> bytes = new ArrayList<byte[]>();
+      for (int i = 0 ; i < values.length; i++) {
+        bytes.add(values[i]);
+      }
+      return bytes.toArray(new byte[values.length][]);
+    }
+    return null;
+  }
+  
+  /** 
+   * Get the specified number of versions of the specified row and column with
+   * the specified timestamp.
+   *
+   * @param row         - row key
+   * @param column      - column name
+   * @param timestamp   - timestamp
+   * @param numVersions - number of versions to retrieve
+   * @return            - array of values that match the above criteria
+   * @throws IOException
+   */
+  public byte[][] get(Text row, Text column, long timestamp, int numVersions)
+  throws IOException {
+    byte [][] values = null;
+    for (int tries = 0; tries < numRetries; tries++) {
+      HRegionLocation r = getRegionLocation(row);
+      HRegionInterface server =
+        connection.getHRegionConnection(r.getServerAddress());
+      
+      try {
+        values = server.get(r.getRegionInfo().getRegionName(), row, column,
+            timestamp, numVersions);
+        
+        break;
+    
+      } catch (IOException e) {
+        if (tries == numRetries - 1) {
+          // No more tries
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          }
+          throw e;
+        }
+        tableServers = connection.reloadTableServers(tableName);
+      }
+      try {
+        Thread.sleep(this.pause);
+        
+      } catch (InterruptedException x) {
+        // continue
+      }
+    }
+
+    if (values != null) {
+      ArrayList<byte[]> bytes = new ArrayList<byte[]>();
+      for (int i = 0 ; i < values.length; i++) {
+        bytes.add(values[i]);
+      }
+      return bytes.toArray(new byte[values.length][]);
+    }
+    return null;
+  }
+    
+  /** 
+   * Get all the data for the specified row
+   * 
+   * @param row         - row key
+   * @return            - map of colums to values
+   * @throws IOException
+   */
+  public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
+    KeyedData[] value = null;
+    for (int tries = 0; tries < numRetries; tries++) {
+      HRegionLocation r = getRegionLocation(row);
+      HRegionInterface server =
+        connection.getHRegionConnection(r.getServerAddress());
+      
+      try {
+        value = server.getRow(r.getRegionInfo().getRegionName(), row);
+        break;
+        
+      } catch (IOException e) {
+        if (tries == numRetries - 1) {
+          // No more tries
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          }
+          throw e;
+        }
+        tableServers = connection.reloadTableServers(tableName);
+      }
+      try {
+        Thread.sleep(this.pause);
+        
+      } catch (InterruptedException x) {
+        // continue
+      }
+    }
+    TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+    if (value != null && value.length != 0) {
+      for (int i = 0; i < value.length; i++) {
+        results.put(value[i].getKey().getColumn(), value[i].getData());
+      }
+    }
+    return results;
+  }
+
+  /** 
+   * Get a scanner on the current table starting at the specified row.
+   * Return the specified columns.
+   *
+   * @param columns array of columns to return
+   * @param startRow starting row in table to scan
+   * @return scanner
+   * @throws IOException
+   */
+  public HScannerInterface obtainScanner(Text[] columns,
+      Text startRow) throws IOException {
+    
+    return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
+  }
+  
+  /** 
+   * Get a scanner on the current table starting at the specified row.
+   * Return the specified columns.
+   *
+   * @param columns array of columns to return
+   * @param startRow starting row in table to scan
+   * @param timestamp only return results whose timestamp <= this value
+   * @return scanner
+   * @throws IOException
+   */
+  public HScannerInterface obtainScanner(Text[] columns,
+      Text startRow, long timestamp) throws IOException {
+    
+    return obtainScanner(columns, startRow, timestamp, null);
+  }
+  
+  /** 
+   * Get a scanner on the current table starting at the specified row.
+   * Return the specified columns.
+   *
+   * @param columns array of columns to return
+   * @param startRow starting row in table to scan
+   * @param filter a row filter using row-key regexp and/or column data filter.
+   * @return scanner
+   * @throws IOException
+   */
+  public HScannerInterface obtainScanner(Text[] columns,
+      Text startRow, RowFilterInterface filter) throws IOException { 
+    
+    return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
+  }
+  
+  /** 
+   * Get a scanner on the current table starting at the specified row.
+   * Return the specified columns.
+   *
+   * @param columns array of columns to return
+   * @param startRow starting row in table to scan
+   * @param timestamp only return results whose timestamp <= this value
+   * @param filter a row filter using row-key regexp and/or column data filter.
+   * @return scanner
+   * @throws IOException
+   */
+  public HScannerInterface obtainScanner(Text[] columns,
+      Text startRow, long timestamp, RowFilterInterface filter)
+  throws IOException {
+    
+    return new ClientScanner(columns, startRow, timestamp, filter);
+  }
+
+  /** 
+   * Start a batch of row insertions/updates.
+   * 
+   * No changes are committed until the call to commitBatchUpdate returns.
+   * A call to abortBatchUpdate will abandon the entire batch.
+   *
+   * @param row name of row to be updated
+   * @return lockid to be used in subsequent put, delete and commit calls
+   */
+  public synchronized long startBatchUpdate(final Text row) {
+    if (batch != null || currentLockId != -1L) {
+      throw new IllegalStateException("update in progress");
+    }
+    batch = new BatchUpdate();
+    return batch.startUpdate(row);
+  }
+  
+  /** 
+   * Abort a batch mutation
+   * @param lockid lock id returned by startBatchUpdate
+   */
+  public synchronized void abortBatch(final long lockid) {
+    if (batch == null) {
+      throw new IllegalStateException("no batch update in progress");
+    }
+    if (batch.getLockid() != lockid) {
+      throw new IllegalArgumentException("invalid lock id " + lockid);
+    }
+    batch = null;
+  }
+  
+  /** 
+   * Finalize a batch mutation
+   *
+   * @param lockid lock id returned by startBatchUpdate
+   * @throws IOException
+   */
+  public void commitBatch(final long lockid) throws IOException {
+    commitBatch(lockid, System.currentTimeMillis());
+  }
+
+  /** 
+   * Finalize a batch mutation
+   *
+   * @param lockid lock id returned by startBatchUpdate
+   * @param timestamp time to associate with all the changes
+   * @throws IOException
+   */
+  public synchronized void commitBatch(final long lockid, final long timestamp)
+  throws IOException {
+
+    if (batch == null) {
+      throw new IllegalStateException("no batch update in progress");
+    }
+    if (batch.getLockid() != lockid) {
+      throw new IllegalArgumentException("invalid lock id " + lockid);
+    }
+    
+    try {
+      for (int tries = 0; tries < numRetries; tries++) {
+        HRegionLocation r = getRegionLocation(batch.getRow());
+        HRegionInterface server =
+          connection.getHRegionConnection(r.getServerAddress());
+
+        try {
+          server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch);
+          break;
+
+        } catch (IOException e) {
+          if (tries < numRetries -1) {
+            tableServers = connection.reloadTableServers(tableName);
+
+          } else {
+            if (e instanceof RemoteException) {
+              e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+            }
+            throw e;
+          }
+        }
+        try {
+          Thread.sleep(pause);
+
+        } catch (InterruptedException e) {
+        }
+      }
+    } finally {
+      batch = null;
+    }
+  }
+  
+  /** 
+   * Start an atomic row insertion/update.  No changes are committed until the 
+   * call to commit() returns. A call to abort() will abandon any updates in progress.
+   *
+   * Callers to this method are given a lease for each unique lockid; before the
+   * lease expires, either abort() or commit() must be called. If it is not 
+   * called, the system will automatically call abort() on the client's behalf.
+   *
+   * The client can gain extra time with a call to renewLease().
+   * Start an atomic row insertion or update
+   * 
+   * @param row Name of row to start update against.
+   * @return Row lockid.
+   * @throws IOException
+   */
+  public synchronized long startUpdate(final Text row) throws IOException {
+    if (currentLockId != -1L || batch != null) {
+      throw new IllegalStateException("update in progress");
+    }
+    for (int tries = 0; tries < numRetries; tries++) {
+      IOException e = null;
+      HRegionLocation info = getRegionLocation(row);
+      try {
+        currentServer =
+          connection.getHRegionConnection(info.getServerAddress());
+
+        currentRegion = info.getRegionInfo().getRegionName();
+        clientid = rand.nextLong();
+        currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
+
+        break;
+
+      } catch (IOException ex) {
+        e = ex;
+      }
+      if (tries < numRetries - 1) {
+        try {
+          Thread.sleep(this.pause);
+
+        } catch (InterruptedException ex) {
+        }
+        try {
+          tableServers = connection.reloadTableServers(tableName);
+
+        } catch (IOException ex) {
+          e = ex;
+        }
+      } else {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
+        throw e;
+      }
+    }
+    return currentLockId;
+  }
+  
+  /** 
+   * Change a value for the specified column.
+   * Runs {@link #abort(long)} if exception thrown.
+   *
+   * @param lockid lock id returned from startUpdate
+   * @param column column whose value is being set
+   * @param val new value for column
+   * @throws IOException
+   */
+  public void put(long lockid, Text column, byte val[]) throws IOException {
+    if (val == null) {
+      throw new IllegalArgumentException("value cannot be null");
+    }
+    if (batch != null) {
+      batch.put(lockid, column, val);
+      return;
+    }
+    
+    if (lockid != currentLockId) {
+      throw new IllegalArgumentException("invalid lockid");
+    }
+    try {
+      this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
+        val);
+    } catch (IOException e) {
+      try {
+        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+      } catch (IOException e2) {
+        LOG.warn(e2);
+      }
+      this.currentServer = null;
+      this.currentRegion = null;
+      if (e instanceof RemoteException) {
+        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+      }
+      throw e;
+    }
+  }
+  
+  /** 
+   * Delete the value for a column
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @param column              - name of column whose value is to be deleted
+   * @throws IOException
+   */
+  public void delete(long lockid, Text column) throws IOException {
+    if (batch != null) {
+      batch.delete(lockid, column);
+      return;
+    }
+    
+    if (lockid != currentLockId) {
+      throw new IllegalArgumentException("invalid lockid");
+    }
+    try {
+      this.currentServer.delete(this.currentRegion, this.clientid, lockid,
+        column);
+    } catch (IOException e) {
+      try {
+        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+      } catch(IOException e2) {
+        LOG.warn(e2);
+      }
+      this.currentServer = null;
+      this.currentRegion = null;
+      if (e instanceof RemoteException) {
+        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+      }
+      throw e;
+    }
+  }
+  
+  /** 
+   * Abort a row mutation
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @throws IOException
+   */
+  public synchronized void abort(long lockid) throws IOException {
+    if (batch != null) {
+      abortBatch(lockid);
+      return;
+    }
+
+    if (lockid != currentLockId) {
+      throw new IllegalArgumentException("invalid lockid");
+    }
+    
+    try {
+      try {
+        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+      } catch (IOException e) {
+        this.currentServer = null;
+        this.currentRegion = null;
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
+        throw e;
+      }
+    } finally {
+      currentLockId = -1L;
+    }
+  }
+  
+  /** 
+   * Finalize a row mutation
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @throws IOException
+   */
+  public void commit(long lockid) throws IOException {
+    commit(lockid, System.currentTimeMillis());
+  }
+
+  /** 
+   * Finalize a row mutation
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @param timestamp           - time to associate with the change
+   * @throws IOException
+   */
+  public synchronized void commit(long lockid, long timestamp) throws IOException {
+    if (batch != null) {
+      commitBatch(lockid, timestamp);
+      return;
+    }
+
+    if (lockid != currentLockId) {
+      throw new IllegalArgumentException("invalid lockid");
+    }
+
+    try {
+      try {
+        this.currentServer.commit(this.currentRegion, this.clientid, lockid,
+            timestamp);
+
+      } catch (IOException e) {
+        this.currentServer = null;
+        this.currentRegion = null;
+        if(e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
+        throw e;
+      }
+    } finally {
+      currentLockId = -1L;
+    }
+  }
+  
+  /**
+   * Renew lease on update
+   * 
+   * @param lockid              - lock id returned from startUpdate
+   * @throws IOException
+   */
+  public synchronized void renewLease(long lockid) throws IOException {
+    if (batch != null) {
+      return;
+    }
+
+    if (lockid != currentLockId) {
+      throw new IllegalArgumentException("invalid lockid");
+    }
+    try {
+      this.currentServer.renewLease(lockid, this.clientid);
+    } catch (IOException e) {
+      try {
+        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+      } catch (IOException e2) {
+        LOG.warn(e2);
+      }
+      this.currentServer = null;
+      this.currentRegion = null;
+      if (e instanceof RemoteException) {
+        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Implements the scanner interface for the HBase client.
+   * If there are multiple regions in a table, this scanner will iterate
+   * through them all.
+   */
+  protected class ClientScanner implements HScannerInterface {
+    private final Text EMPTY_COLUMN = new Text();
+    private Text[] columns;
+    private Text startRow;
+    private long scanTime;
+    @SuppressWarnings("hiding")
+    private boolean closed;
+    private AtomicReferenceArray<HRegionLocation> regions;
+    @SuppressWarnings("hiding")
+    private int currentRegion;
+    private HRegionInterface server;
+    private long scannerId;
+    private RowFilterInterface filter;
+    
+    private void loadRegions() {
+      Text firstServer = null;
+      if (this.startRow == null || this.startRow.getLength() == 0) {
+        firstServer = tableServers.firstKey();
+
+      } else if(tableServers.containsKey(startRow)) {
+        firstServer = startRow;
+
+      } else {
+        firstServer = tableServers.headMap(startRow).lastKey();
+      }
+      Collection<HRegionLocation> info =
+        tableServers.tailMap(firstServer).values();
+      
+      this.regions = new AtomicReferenceArray<HRegionLocation>(
+          info.toArray(new HRegionLocation[info.size()]));
+    }
+    
+    protected ClientScanner(Text[] columns, Text startRow, long timestamp,
+        RowFilterInterface filter) throws IOException {
+      
+      this.columns = columns;
+      this.startRow = startRow;
+      this.scanTime = timestamp;
+      this.closed = false;
+      this.filter = filter;
+      if (filter != null) {
+        filter.validate(columns);
+      }
+      loadRegions();
+      this.currentRegion = -1;
+      this.server = null;
+      this.scannerId = -1L;
+      nextScanner();
+    }
+    
+    /*
+     * Gets a scanner for the next region.
+     * Returns false if there are no more scanners.
+     */
+    private boolean nextScanner() throws IOException {
+      if (this.scannerId != -1L) {
+        this.server.close(this.scannerId);
+        this.scannerId = -1L;
+      }
+      this.currentRegion += 1;
+      if (this.currentRegion == this.regions.length()) {
+        close();
+        return false;
+      }
+      try {
+        for (int tries = 0; tries < numRetries; tries++) {
+          HRegionLocation r = this.regions.get(currentRegion);
+          this.server =
+            connection.getHRegionConnection(r.getServerAddress());
+          
+          try {
+            if (this.filter == null) {
+              this.scannerId =
+                this.server.openScanner(r.getRegionInfo().getRegionName(),
+                    this.columns, currentRegion == 0 ? this.startRow
+                        : EMPTY_START_ROW, scanTime, null);
+              
+            } else {
+              this.scannerId =
+                this.server.openScanner(r.getRegionInfo().getRegionName(),
+                    this.columns, currentRegion == 0 ? this.startRow
+                        : EMPTY_START_ROW, scanTime, filter);
+            }
+
+            break;
+        
+          } catch (IOException e) {
+            if (tries == numRetries - 1) {
+              // No more tries
+              if (e instanceof RemoteException) {
+                e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+              }
+              throw e;
+            }
+            tableServers = connection.reloadTableServers(tableName);
+            loadRegions();
+          }
+        }
+
+      } catch (IOException e) {
+        close();
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
+        throw e;
+      }
+      return true;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
+      if (this.closed) {
+        return false;
+      }
+      KeyedData[] values = null;
+      do {
+        values = this.server.next(this.scannerId);
+      } while (values != null && values.length == 0 && nextScanner());
+
+      if (values != null && values.length != 0) {
+        for (int i = 0; i < values.length; i++) {
+          key.setRow(values[i].getKey().getRow());
+          key.setVersion(values[i].getKey().getTimestamp());
+          key.setColumn(EMPTY_COLUMN);
+          results.put(values[i].getKey().getColumn(), values[i].getData());
+        }
+      }
+      return values == null ? false : values.length != 0;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void close() throws IOException {
+      if (this.scannerId != -1L) {
+        this.server.close(this.scannerId);
+        this.scannerId = -1L;
+      }
+      this.server = null;
+      this.closed = true;
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java Wed Aug  1 13:10:11 2007
@@ -180,10 +180,8 @@
    * 
    * @param holderId id of lease holder
    * @param resourceId id of resource being leased
-   * @throws IOException
    */
-  public void cancelLease(final long holderId, final long resourceId)
-  throws IOException {
+  public void cancelLease(final long holderId, final long resourceId) {
     LeaseName name = null;
     synchronized(leases) {
       synchronized(sortedLeases) {
@@ -191,9 +189,8 @@
         Lease lease = leases.get(name);
         if (lease == null) {
           // It's possible that someone tries to renew the lease, but 
-          // it just expired a moment ago.  So fail.
-          throw new IOException("Cannot cancel lease that is not held: " +
-            name);
+          // it just expired a moment ago.  So just skip it.
+          return;
         }
         sortedLeases.remove(lease);
         leases.remove(name);
@@ -206,6 +203,7 @@
 
   /** LeaseMonitor is a thread that expires Leases that go on too long. */
   class LeaseMonitor implements Runnable {
+    /** {@inheritDoc} */
     public void run() {
       while(running) {
         synchronized(leases) {
@@ -236,6 +234,7 @@
    * A Lease name.
    * More lightweight than String or Text.
    */
+  @SuppressWarnings("unchecked")
   class LeaseName implements Comparable {
     private final long holderId;
     private final long resourceId;
@@ -245,6 +244,7 @@
       this.resourceId = rid;
     }
     
+    /** {@inheritDoc} */
     @Override
     public boolean equals(Object obj) {
       LeaseName other = (LeaseName)obj;
@@ -252,6 +252,7 @@
         this.resourceId == other.resourceId;
     }
     
+    /** {@inheritDoc} */
     @Override
     public int hashCode() {
       // Copy OR'ing from javadoc for Long#hashCode.
@@ -260,12 +261,14 @@
       return result;
     }
     
+    /** {@inheritDoc} */
     @Override
     public String toString() {
       return Long.toString(this.holderId) + "/" +
         Long.toString(this.resourceId);
     }
 
+    /** {@inheritDoc} */
     public int compareTo(Object obj) {
       LeaseName other = (LeaseName)obj;
       if (this.holderId < other.holderId) {
@@ -292,6 +295,7 @@
   }
 
   /** This class tracks a single Lease. */
+  @SuppressWarnings("unchecked")
   private class Lease implements Comparable {
     final long holderId;
     final long resourceId;
@@ -329,11 +333,13 @@
       listener.leaseExpired();
     }
     
+    /** {@inheritDoc} */
     @Override
     public boolean equals(Object obj) {
       return compareTo(obj) == 0;
     }
     
+    /** {@inheritDoc} */
     @Override
     public int hashCode() {
       int result = this.getLeaseName().hashCode();
@@ -345,6 +351,7 @@
     // Comparable
     //////////////////////////////////////////////////////////////////////////////
 
+    /** {@inheritDoc} */
     public int compareTo(Object o) {
       Lease other = (Lease) o;
       if(this.lastUpdate < other.lastUpdate) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Wed Aug  1 13:10:11 2007
@@ -53,6 +53,7 @@
     this.regionServers = 1;
   }
 
+  /** {@inheritDoc} */
   @Override
   public void setUp() throws Exception {
     super.setUp();
@@ -60,11 +61,13 @@
       new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs);
   }
 
+  /** {@inheritDoc} */
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
     if (this.cluster != null) {
       this.cluster.shutdown();
     }
+    HConnectionManager.deleteConnection(conf);
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java Wed Aug  1 13:10:11 2007
@@ -23,6 +23,8 @@
 import java.util.TreeMap;
 
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
 /**
  * Tests region server failover when a region server exits.
@@ -36,6 +38,8 @@
     conf.setInt("ipc.client.timeout", 5000);            // reduce ipc client timeout
     conf.setInt("ipc.client.connect.max.retries", 5);   // and number of retries
     conf.setInt("hbase.client.retries.number", 2);      // reduce HBase retries
+    Logger.getRootLogger().setLevel(Level.WARN);
+    Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
   }
   
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java Wed Aug  1 13:10:11 2007
@@ -38,7 +38,8 @@
     conf.setInt("ipc.client.timeout", 5000);            // reduce client timeout
     conf.setInt("ipc.client.connect.max.retries", 5);   // and number of retries
     conf.setInt("hbase.client.retries.number", 2);      // reduce HBase retries
-//    Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
+    Logger.getRootLogger().setLevel(Level.WARN);
+    Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
   }
   
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Wed Aug  1 13:10:11 2007
@@ -69,7 +69,7 @@
     // Setup colkeys to be inserted
     HTableDescriptor htd = new HTableDescriptor(getName());
     Text tableName = new Text(getName());
-    Text[] colKeys = new Text[(int)(LAST_COLKEY - FIRST_COLKEY) + 1];
+    Text[] colKeys = new Text[(LAST_COLKEY - FIRST_COLKEY) + 1];
     for (char i = 0; i < colKeys.length; i++) {
       colKeys[i] = new Text(new String(new char[] { 
         (char)(FIRST_COLKEY + i), ':' }));
@@ -201,9 +201,9 @@
     long scannerId = -1L;
     try {
       client.openTable(table);
-      HClient.RegionLocation rl = client.getRegionLocation(table);
-      regionServer = client.getHRegionConnection(rl.serverAddress);
-      scannerId = regionServer.openScanner(rl.regionInfo.regionName,
+      HRegionLocation rl = client.getRegionLocation(table);
+      regionServer = client.getHRegionConnection(rl.getServerAddress());
+      scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(),
           HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null);
       while (true) {
         TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();