You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/08/01 22:10:20 UTC
svn commit: r561935 [2/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?view=auto&rev=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Wed Aug 1 13:10:11 2007
@@ -0,0 +1,850 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Used to communicate with a single HBase table
+ */
+public class HTable implements HConstants {
+ protected final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ protected final HConnection connection;
+ protected final Text tableName;
+ protected final long pause;
+ protected final int numRetries;
+ protected Random rand;
+ protected volatile SortedMap<Text, HRegionLocation> tableServers;
+ protected BatchUpdate batch;
+
+ // For row mutation operations
+
+ protected volatile long currentLockId;
+ protected volatile Text currentRegion;
+ protected volatile HRegionInterface currentServer;
+ protected volatile long clientid;
+
+ protected volatile boolean closed;
+
+ /**
+ * Creates an object to access a HBase table
+ *
+ * @param conf configuration object
+ * @param tableName name of the table
+ * @throws IOException
+ */
+ public HTable(Configuration conf, Text tableName) throws IOException {
+ closed = true;
+ this.connection = HConnectionManager.getConnection(conf);
+ this.tableName = tableName;
+ this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
+ this.numRetries = conf.getInt("hbase.client.retries.number", 5);
+ this.rand = new Random();
+ tableServers = connection.getTableServers(tableName);
+ this.batch = null;
+ this.currentLockId = -1L;
+ closed = false;
+ }
+
+ /**
+ * Find region location hosting passed row using cached info
+ * @param row Row to find.
+ * @return Location of row.
+ */
+ HRegionLocation getRegionLocation(Text row) {
+ if (this.tableServers == null) {
+ throw new IllegalStateException("Must open table first");
+ }
+
+ // Only one server will have the row we are looking for
+ Text serverKey = (this.tableServers.containsKey(row)) ?
+ row : this.tableServers.headMap(row).lastKey();
+ return this.tableServers.get(serverKey);
+ }
+
+ /**
+ * Verifies that no update is in progress
+ */
+ public synchronized void checkUpdateInProgress() {
+ if (batch != null || currentLockId != -1L) {
+ throw new IllegalStateException("update in progress");
+ }
+ }
+
+ /**
+ * Gets the starting row key for every region in the currently open table
+ * @return Array of region starting row keys
+ */
+ public Text[] getStartKeys() {
+ if (closed) {
+ throw new IllegalStateException("table is closed");
+ }
+ Text[] keys = new Text[tableServers.size()];
+ int i = 0;
+ for(Text key: tableServers.keySet()){
+ keys[i++] = key;
+ }
+ return keys;
+ }
+
+ /**
+ * Get a single value for the specified row and column
+ *
+ * @param row row key
+ * @param column column name
+ * @return value for specified row/column
+ * @throws IOException
+ */
+ public byte[] get(Text row, Text column) throws IOException {
+ byte [] value = null;
+ for(int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = getRegionLocation(row);
+ HRegionInterface server =
+ connection.getHRegionConnection(r.getServerAddress());
+
+ try {
+ value = server.get(r.getRegionInfo().getRegionName(), row, column);
+ break;
+
+ } catch (IOException e) {
+ if (tries == numRetries - 1) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ tableServers = connection.reloadTableServers(tableName);
+ }
+ try {
+ Thread.sleep(this.pause);
+
+ } catch (InterruptedException x) {
+ // continue
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Get the specified number of versions of the specified row and column
+ *
+ * @param row - row key
+ * @param column - column name
+ * @param numVersions - number of versions to retrieve
+ * @return - array byte values
+ * @throws IOException
+ */
+ public byte[][] get(Text row, Text column, int numVersions) throws IOException {
+ byte [][] values = null;
+ for (int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = getRegionLocation(row);
+ HRegionInterface server =
+ connection.getHRegionConnection(r.getServerAddress());
+
+ try {
+ values = server.get(r.getRegionInfo().getRegionName(), row, column,
+ numVersions);
+
+ break;
+
+ } catch (IOException e) {
+ if (tries == numRetries - 1) {
+ // No more tries
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ tableServers = connection.reloadTableServers(tableName);
+ }
+ try {
+ Thread.sleep(this.pause);
+
+ } catch (InterruptedException x) {
+ // continue
+ }
+ }
+
+ if (values != null) {
+ ArrayList<byte[]> bytes = new ArrayList<byte[]>();
+ for (int i = 0 ; i < values.length; i++) {
+ bytes.add(values[i]);
+ }
+ return bytes.toArray(new byte[values.length][]);
+ }
+ return null;
+ }
+
+ /**
+ * Get the specified number of versions of the specified row and column with
+ * the specified timestamp.
+ *
+ * @param row - row key
+ * @param column - column name
+ * @param timestamp - timestamp
+ * @param numVersions - number of versions to retrieve
+ * @return - array of values that match the above criteria
+ * @throws IOException
+ */
+ public byte[][] get(Text row, Text column, long timestamp, int numVersions)
+ throws IOException {
+ byte [][] values = null;
+ for (int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = getRegionLocation(row);
+ HRegionInterface server =
+ connection.getHRegionConnection(r.getServerAddress());
+
+ try {
+ values = server.get(r.getRegionInfo().getRegionName(), row, column,
+ timestamp, numVersions);
+
+ break;
+
+ } catch (IOException e) {
+ if (tries == numRetries - 1) {
+ // No more tries
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ tableServers = connection.reloadTableServers(tableName);
+ }
+ try {
+ Thread.sleep(this.pause);
+
+ } catch (InterruptedException x) {
+ // continue
+ }
+ }
+
+ if (values != null) {
+ ArrayList<byte[]> bytes = new ArrayList<byte[]>();
+ for (int i = 0 ; i < values.length; i++) {
+ bytes.add(values[i]);
+ }
+ return bytes.toArray(new byte[values.length][]);
+ }
+ return null;
+ }
+
+ /**
+ * Get all the data for the specified row
+ *
+ * @param row - row key
+ * @return - map of colums to values
+ * @throws IOException
+ */
+ public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
+ KeyedData[] value = null;
+ for (int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = getRegionLocation(row);
+ HRegionInterface server =
+ connection.getHRegionConnection(r.getServerAddress());
+
+ try {
+ value = server.getRow(r.getRegionInfo().getRegionName(), row);
+ break;
+
+ } catch (IOException e) {
+ if (tries == numRetries - 1) {
+ // No more tries
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ tableServers = connection.reloadTableServers(tableName);
+ }
+ try {
+ Thread.sleep(this.pause);
+
+ } catch (InterruptedException x) {
+ // continue
+ }
+ }
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ if (value != null && value.length != 0) {
+ for (int i = 0; i < value.length; i++) {
+ results.put(value[i].getKey().getColumn(), value[i].getData());
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row.
+ * Return the specified columns.
+ *
+ * @param columns array of columns to return
+ * @param startRow starting row in table to scan
+ * @return scanner
+ * @throws IOException
+ */
+ public HScannerInterface obtainScanner(Text[] columns,
+ Text startRow) throws IOException {
+
+ return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row.
+ * Return the specified columns.
+ *
+ * @param columns array of columns to return
+ * @param startRow starting row in table to scan
+ * @param timestamp only return results whose timestamp <= this value
+ * @return scanner
+ * @throws IOException
+ */
+ public HScannerInterface obtainScanner(Text[] columns,
+ Text startRow, long timestamp) throws IOException {
+
+ return obtainScanner(columns, startRow, timestamp, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row.
+ * Return the specified columns.
+ *
+ * @param columns array of columns to return
+ * @param startRow starting row in table to scan
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ public HScannerInterface obtainScanner(Text[] columns,
+ Text startRow, RowFilterInterface filter) throws IOException {
+
+ return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row.
+ * Return the specified columns.
+ *
+ * @param columns array of columns to return
+ * @param startRow starting row in table to scan
+ * @param timestamp only return results whose timestamp <= this value
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ public HScannerInterface obtainScanner(Text[] columns,
+ Text startRow, long timestamp, RowFilterInterface filter)
+ throws IOException {
+
+ return new ClientScanner(columns, startRow, timestamp, filter);
+ }
+
+ /**
+ * Start a batch of row insertions/updates.
+ *
+ * No changes are committed until the call to commitBatchUpdate returns.
+ * A call to abortBatchUpdate will abandon the entire batch.
+ *
+ * @param row name of row to be updated
+ * @return lockid to be used in subsequent put, delete and commit calls
+ */
+ public synchronized long startBatchUpdate(final Text row) {
+ if (batch != null || currentLockId != -1L) {
+ throw new IllegalStateException("update in progress");
+ }
+ batch = new BatchUpdate();
+ return batch.startUpdate(row);
+ }
+
+ /**
+ * Abort a batch mutation
+ * @param lockid lock id returned by startBatchUpdate
+ */
+ public synchronized void abortBatch(final long lockid) {
+ if (batch == null) {
+ throw new IllegalStateException("no batch update in progress");
+ }
+ if (batch.getLockid() != lockid) {
+ throw new IllegalArgumentException("invalid lock id " + lockid);
+ }
+ batch = null;
+ }
+
+ /**
+ * Finalize a batch mutation
+ *
+ * @param lockid lock id returned by startBatchUpdate
+ * @throws IOException
+ */
+ public void commitBatch(final long lockid) throws IOException {
+ commitBatch(lockid, System.currentTimeMillis());
+ }
+
+ /**
+ * Finalize a batch mutation
+ *
+ * @param lockid lock id returned by startBatchUpdate
+ * @param timestamp time to associate with all the changes
+ * @throws IOException
+ */
+ public synchronized void commitBatch(final long lockid, final long timestamp)
+ throws IOException {
+
+ if (batch == null) {
+ throw new IllegalStateException("no batch update in progress");
+ }
+ if (batch.getLockid() != lockid) {
+ throw new IllegalArgumentException("invalid lock id " + lockid);
+ }
+
+ try {
+ for (int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = getRegionLocation(batch.getRow());
+ HRegionInterface server =
+ connection.getHRegionConnection(r.getServerAddress());
+
+ try {
+ server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch);
+ break;
+
+ } catch (IOException e) {
+ if (tries < numRetries -1) {
+ tableServers = connection.reloadTableServers(tableName);
+
+ } else {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ }
+ try {
+ Thread.sleep(pause);
+
+ } catch (InterruptedException e) {
+ }
+ }
+ } finally {
+ batch = null;
+ }
+ }
+
+ /**
+ * Start an atomic row insertion/update. No changes are committed until the
+ * call to commit() returns. A call to abort() will abandon any updates in progress.
+ *
+ * Callers to this method are given a lease for each unique lockid; before the
+ * lease expires, either abort() or commit() must be called. If it is not
+ * called, the system will automatically call abort() on the client's behalf.
+ *
+ * The client can gain extra time with a call to renewLease().
+ * Start an atomic row insertion or update
+ *
+ * @param row Name of row to start update against.
+ * @return Row lockid.
+ * @throws IOException
+ */
+ public synchronized long startUpdate(final Text row) throws IOException {
+ if (currentLockId != -1L || batch != null) {
+ throw new IllegalStateException("update in progress");
+ }
+ for (int tries = 0; tries < numRetries; tries++) {
+ IOException e = null;
+ HRegionLocation info = getRegionLocation(row);
+ try {
+ currentServer =
+ connection.getHRegionConnection(info.getServerAddress());
+
+ currentRegion = info.getRegionInfo().getRegionName();
+ clientid = rand.nextLong();
+ currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
+
+ break;
+
+ } catch (IOException ex) {
+ e = ex;
+ }
+ if (tries < numRetries - 1) {
+ try {
+ Thread.sleep(this.pause);
+
+ } catch (InterruptedException ex) {
+ }
+ try {
+ tableServers = connection.reloadTableServers(tableName);
+
+ } catch (IOException ex) {
+ e = ex;
+ }
+ } else {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ }
+ return currentLockId;
+ }
+
+ /**
+ * Change a value for the specified column.
+ * Runs {@link #abort(long)} if exception thrown.
+ *
+ * @param lockid lock id returned from startUpdate
+ * @param column column whose value is being set
+ * @param val new value for column
+ * @throws IOException
+ */
+ public void put(long lockid, Text column, byte val[]) throws IOException {
+ if (val == null) {
+ throw new IllegalArgumentException("value cannot be null");
+ }
+ if (batch != null) {
+ batch.put(lockid, column, val);
+ return;
+ }
+
+ if (lockid != currentLockId) {
+ throw new IllegalArgumentException("invalid lockid");
+ }
+ try {
+ this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
+ val);
+ } catch (IOException e) {
+ try {
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+ } catch (IOException e2) {
+ LOG.warn(e2);
+ }
+ this.currentServer = null;
+ this.currentRegion = null;
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Delete the value for a column
+ *
+ * @param lockid - lock id returned from startUpdate
+ * @param column - name of column whose value is to be deleted
+ * @throws IOException
+ */
+ public void delete(long lockid, Text column) throws IOException {
+ if (batch != null) {
+ batch.delete(lockid, column);
+ return;
+ }
+
+ if (lockid != currentLockId) {
+ throw new IllegalArgumentException("invalid lockid");
+ }
+ try {
+ this.currentServer.delete(this.currentRegion, this.clientid, lockid,
+ column);
+ } catch (IOException e) {
+ try {
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+ } catch(IOException e2) {
+ LOG.warn(e2);
+ }
+ this.currentServer = null;
+ this.currentRegion = null;
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Abort a row mutation
+ *
+ * @param lockid - lock id returned from startUpdate
+ * @throws IOException
+ */
+ public synchronized void abort(long lockid) throws IOException {
+ if (batch != null) {
+ abortBatch(lockid);
+ return;
+ }
+
+ if (lockid != currentLockId) {
+ throw new IllegalArgumentException("invalid lockid");
+ }
+
+ try {
+ try {
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+ } catch (IOException e) {
+ this.currentServer = null;
+ this.currentRegion = null;
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ } finally {
+ currentLockId = -1L;
+ }
+ }
+
+ /**
+ * Finalize a row mutation
+ *
+ * @param lockid - lock id returned from startUpdate
+ * @throws IOException
+ */
+ public void commit(long lockid) throws IOException {
+ commit(lockid, System.currentTimeMillis());
+ }
+
+ /**
+ * Finalize a row mutation
+ *
+ * @param lockid - lock id returned from startUpdate
+ * @param timestamp - time to associate with the change
+ * @throws IOException
+ */
+ public synchronized void commit(long lockid, long timestamp) throws IOException {
+ if (batch != null) {
+ commitBatch(lockid, timestamp);
+ return;
+ }
+
+ if (lockid != currentLockId) {
+ throw new IllegalArgumentException("invalid lockid");
+ }
+
+ try {
+ try {
+ this.currentServer.commit(this.currentRegion, this.clientid, lockid,
+ timestamp);
+
+ } catch (IOException e) {
+ this.currentServer = null;
+ this.currentRegion = null;
+ if(e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ } finally {
+ currentLockId = -1L;
+ }
+ }
+
+ /**
+ * Renew lease on update
+ *
+ * @param lockid - lock id returned from startUpdate
+ * @throws IOException
+ */
+ public synchronized void renewLease(long lockid) throws IOException {
+ if (batch != null) {
+ return;
+ }
+
+ if (lockid != currentLockId) {
+ throw new IllegalArgumentException("invalid lockid");
+ }
+ try {
+ this.currentServer.renewLease(lockid, this.clientid);
+ } catch (IOException e) {
+ try {
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
+ } catch (IOException e2) {
+ LOG.warn(e2);
+ }
+ this.currentServer = null;
+ this.currentRegion = null;
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all.
+ */
+ protected class ClientScanner implements HScannerInterface {
+ private final Text EMPTY_COLUMN = new Text();
+ private Text[] columns;
+ private Text startRow;
+ private long scanTime;
+ @SuppressWarnings("hiding")
+ private boolean closed;
+ private AtomicReferenceArray<HRegionLocation> regions;
+ @SuppressWarnings("hiding")
+ private int currentRegion;
+ private HRegionInterface server;
+ private long scannerId;
+ private RowFilterInterface filter;
+
+ private void loadRegions() {
+ Text firstServer = null;
+ if (this.startRow == null || this.startRow.getLength() == 0) {
+ firstServer = tableServers.firstKey();
+
+ } else if(tableServers.containsKey(startRow)) {
+ firstServer = startRow;
+
+ } else {
+ firstServer = tableServers.headMap(startRow).lastKey();
+ }
+ Collection<HRegionLocation> info =
+ tableServers.tailMap(firstServer).values();
+
+ this.regions = new AtomicReferenceArray<HRegionLocation>(
+ info.toArray(new HRegionLocation[info.size()]));
+ }
+
+ protected ClientScanner(Text[] columns, Text startRow, long timestamp,
+ RowFilterInterface filter) throws IOException {
+
+ this.columns = columns;
+ this.startRow = startRow;
+ this.scanTime = timestamp;
+ this.closed = false;
+ this.filter = filter;
+ if (filter != null) {
+ filter.validate(columns);
+ }
+ loadRegions();
+ this.currentRegion = -1;
+ this.server = null;
+ this.scannerId = -1L;
+ nextScanner();
+ }
+
+ /*
+ * Gets a scanner for the next region.
+ * Returns false if there are no more scanners.
+ */
+ private boolean nextScanner() throws IOException {
+ if (this.scannerId != -1L) {
+ this.server.close(this.scannerId);
+ this.scannerId = -1L;
+ }
+ this.currentRegion += 1;
+ if (this.currentRegion == this.regions.length()) {
+ close();
+ return false;
+ }
+ try {
+ for (int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = this.regions.get(currentRegion);
+ this.server =
+ connection.getHRegionConnection(r.getServerAddress());
+
+ try {
+ if (this.filter == null) {
+ this.scannerId =
+ this.server.openScanner(r.getRegionInfo().getRegionName(),
+ this.columns, currentRegion == 0 ? this.startRow
+ : EMPTY_START_ROW, scanTime, null);
+
+ } else {
+ this.scannerId =
+ this.server.openScanner(r.getRegionInfo().getRegionName(),
+ this.columns, currentRegion == 0 ? this.startRow
+ : EMPTY_START_ROW, scanTime, filter);
+ }
+
+ break;
+
+ } catch (IOException e) {
+ if (tries == numRetries - 1) {
+ // No more tries
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ tableServers = connection.reloadTableServers(tableName);
+ loadRegions();
+ }
+ }
+
+ } catch (IOException e) {
+ close();
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ throw e;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
+ if (this.closed) {
+ return false;
+ }
+ KeyedData[] values = null;
+ do {
+ values = this.server.next(this.scannerId);
+ } while (values != null && values.length == 0 && nextScanner());
+
+ if (values != null && values.length != 0) {
+ for (int i = 0; i < values.length; i++) {
+ key.setRow(values[i].getKey().getRow());
+ key.setVersion(values[i].getKey().getTimestamp());
+ key.setColumn(EMPTY_COLUMN);
+ results.put(values[i].getKey().getColumn(), values[i].getData());
+ }
+ }
+ return values == null ? false : values.length != 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws IOException {
+ if (this.scannerId != -1L) {
+ this.server.close(this.scannerId);
+ this.scannerId = -1L;
+ }
+ this.server = null;
+ this.closed = true;
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java Wed Aug 1 13:10:11 2007
@@ -180,10 +180,8 @@
*
* @param holderId id of lease holder
* @param resourceId id of resource being leased
- * @throws IOException
*/
- public void cancelLease(final long holderId, final long resourceId)
- throws IOException {
+ public void cancelLease(final long holderId, final long resourceId) {
LeaseName name = null;
synchronized(leases) {
synchronized(sortedLeases) {
@@ -191,9 +189,8 @@
Lease lease = leases.get(name);
if (lease == null) {
// It's possible that someone tries to renew the lease, but
- // it just expired a moment ago. So fail.
- throw new IOException("Cannot cancel lease that is not held: " +
- name);
+ // it just expired a moment ago. So just skip it.
+ return;
}
sortedLeases.remove(lease);
leases.remove(name);
@@ -206,6 +203,7 @@
/** LeaseMonitor is a thread that expires Leases that go on too long. */
class LeaseMonitor implements Runnable {
+ /** {@inheritDoc} */
public void run() {
while(running) {
synchronized(leases) {
@@ -236,6 +234,7 @@
* A Lease name.
* More lightweight than String or Text.
*/
+ @SuppressWarnings("unchecked")
class LeaseName implements Comparable {
private final long holderId;
private final long resourceId;
@@ -245,6 +244,7 @@
this.resourceId = rid;
}
+ /** {@inheritDoc} */
@Override
public boolean equals(Object obj) {
LeaseName other = (LeaseName)obj;
@@ -252,6 +252,7 @@
this.resourceId == other.resourceId;
}
+ /** {@inheritDoc} */
@Override
public int hashCode() {
// Copy OR'ing from javadoc for Long#hashCode.
@@ -260,12 +261,14 @@
return result;
}
+ /** {@inheritDoc} */
@Override
public String toString() {
return Long.toString(this.holderId) + "/" +
Long.toString(this.resourceId);
}
+ /** {@inheritDoc} */
public int compareTo(Object obj) {
LeaseName other = (LeaseName)obj;
if (this.holderId < other.holderId) {
@@ -292,6 +295,7 @@
}
/** This class tracks a single Lease. */
+ @SuppressWarnings("unchecked")
private class Lease implements Comparable {
final long holderId;
final long resourceId;
@@ -329,11 +333,13 @@
listener.leaseExpired();
}
+ /** {@inheritDoc} */
@Override
public boolean equals(Object obj) {
return compareTo(obj) == 0;
}
+ /** {@inheritDoc} */
@Override
public int hashCode() {
int result = this.getLeaseName().hashCode();
@@ -345,6 +351,7 @@
// Comparable
//////////////////////////////////////////////////////////////////////////////
+ /** {@inheritDoc} */
public int compareTo(Object o) {
Lease other = (Lease) o;
if(this.lastUpdate < other.lastUpdate) {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Wed Aug 1 13:10:11 2007
@@ -53,6 +53,7 @@
this.regionServers = 1;
}
+ /** {@inheritDoc} */
@Override
public void setUp() throws Exception {
super.setUp();
@@ -60,11 +61,13 @@
new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs);
}
+ /** {@inheritDoc} */
@Override
public void tearDown() throws Exception {
super.tearDown();
if (this.cluster != null) {
this.cluster.shutdown();
}
+ HConnectionManager.deleteConnection(conf);
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java Wed Aug 1 13:10:11 2007
@@ -23,6 +23,8 @@
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
/**
* Tests region server failover when a region server exits.
@@ -36,6 +38,8 @@
conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries
+ Logger.getRootLogger().setLevel(Level.WARN);
+ Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
}
/**
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java Wed Aug 1 13:10:11 2007
@@ -38,7 +38,8 @@
conf.setInt("ipc.client.timeout", 5000); // reduce client timeout
conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries
-// Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
+ Logger.getRootLogger().setLevel(Level.WARN);
+ Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
}
/**
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?view=diff&rev=561935&r1=561934&r2=561935
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Wed Aug 1 13:10:11 2007
@@ -69,7 +69,7 @@
// Setup colkeys to be inserted
HTableDescriptor htd = new HTableDescriptor(getName());
Text tableName = new Text(getName());
- Text[] colKeys = new Text[(int)(LAST_COLKEY - FIRST_COLKEY) + 1];
+ Text[] colKeys = new Text[(LAST_COLKEY - FIRST_COLKEY) + 1];
for (char i = 0; i < colKeys.length; i++) {
colKeys[i] = new Text(new String(new char[] {
(char)(FIRST_COLKEY + i), ':' }));
@@ -201,9 +201,9 @@
long scannerId = -1L;
try {
client.openTable(table);
- HClient.RegionLocation rl = client.getRegionLocation(table);
- regionServer = client.getHRegionConnection(rl.serverAddress);
- scannerId = regionServer.openScanner(rl.regionInfo.regionName,
+ HRegionLocation rl = client.getRegionLocation(table);
+ regionServer = client.getHRegionConnection(rl.getServerAddress());
+ scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(),
HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null);
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();