You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/11/10 02:54:14 UTC
svn commit: r1033321 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/rest/client/
src/test/java/org/apache/hadoop/hbase/client/
Author: rawson
Date: Wed Nov 10 01:54:13 2010
New Revision: 1033321
URL: http://svn.apache.org/viewvc?rev=1033321&view=rev
Log:
HBASE-2898 MultiPut makes proper error handling impossible and leads to corrupted data
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Nov 10 01:54:13 2010
@@ -667,6 +667,8 @@ Release 0.90.0 - Unreleased
HBASE-3199 large response handling: some fixups and cleanups
HBASE-3212 More testing of enable/disable uncovered base condition not in
place; i.e. that only one enable/disable runs at a time
+ HBASE-2898 MultiPut makes proper error handling impossible and leads to
+ corrupted data
IMPROVEMENTS
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Nov 10 01:54:13 2010
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -243,45 +242,25 @@ public interface HConnection extends Abo
/**
* Process a mixed batch of Get, Put and Delete actions. All actions for a
* RegionServer are forwarded in one RPC call.
- *
+ *
+ *
* @param actions The collection of actions.
* @param tableName Name of the hbase table
* @param pool thread pool for parallel execution
* @param results An empty array, same size as list. If an exception is thrown,
* you can test here for partial results, and to determine which actions
* processed successfully.
- * @throws IOException
+ * @throws IOException if there are problems talking to META. Per-item
+ * exceptions are stored in the results array.
*/
public void processBatch(List<Row> actions, final byte[] tableName,
- ExecutorService pool, Result[] results)
- throws IOException;
-
- /**
- * Process a batch of Puts. Does the retries.
- * @param list A batch of Puts to process.
- * @param tableName The name of the table
- * @return Count of committed Puts. On fault, < list.size().
- * @throws IOException if a remote or network exception occurs
- * @deprecated Use HConnectionManager::processBatch instead.
- */
- public int processBatchOfRows(ArrayList<Put> list, byte[] tableName, ExecutorService pool)
- throws IOException;
-
- /**
- * Process a batch of Deletes. Does the retries.
- * @param list A batch of Deletes to process.
- * @param tableName The name of the table
- * @return Count of committed Deletes. On fault, < list.size().
- * @throws IOException if a remote or network exception occurs
- * @deprecated Use HConnectionManager::processBatch instead.
- */
- public int processBatchOfDeletes(List<Delete> list, byte[] tableName, ExecutorService pool)
- throws IOException;
+ ExecutorService pool, Object[] results)
+ throws IOException, InterruptedException;
/**
* Process a batch of Puts.
*
- * @param list The collection of actions. The list is mutated: all successful Puts
+ * @param list The collection of actions. The list is mutated: all successful Puts
* are removed from the list.
* @param tableName Name of the hbase table
* @param pool Thread pool for parallel execution
@@ -289,7 +268,8 @@ public interface HConnection extends Abo
* @deprecated Use HConnectionManager::processBatch instead.
*/
public void processBatchOfPuts(List<Put> list,
- final byte[] tableName, ExecutorService pool) throws IOException;
+ final byte[] tableName, ExecutorService pool)
+ throws IOException;
/**
* Enable or disable region cache prefetch for the table. It will be
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Nov 10 01:54:13 2010
@@ -1029,39 +1029,6 @@ public class HConnectionManager {
}
}
- /**
- * @deprecated Use HConnectionManager::processBatch instead.
- */
- public int processBatchOfRows(final ArrayList<Put> list, final byte[] tableName, ExecutorService pool)
- throws IOException {
- Result[] results = new Result[list.size()];
- processBatch((List) list, tableName, pool, results);
- int count = 0;
- for (Result r : results) {
- if (r != null) {
- count++;
- }
- }
- return (count == list.size() ? -1 : count);
- }
-
- /**
- * @deprecated Use HConnectionManager::processBatch instead.
- */
- public int processBatchOfDeletes(final List<Delete> list,
- final byte[] tableName, ExecutorService pool)
- throws IOException {
- Result[] results = new Result[list.size()];
- processBatch((List) list, tableName, pool, results);
- int count = 0;
- for (Result r : results) {
- if (r != null) {
- count++;
- }
- }
- return (count == list.size() ? -1 : count);
- }
-
void close(boolean stopProxy) {
if (master != null) {
if (stopProxy) {
@@ -1088,28 +1055,28 @@ public class HConnectionManager {
final HServerAddress address,
final MultiAction multi,
final byte [] tableName) {
- final HConnection connection = this;
- return new Callable<MultiResponse>() {
- public MultiResponse call() throws IOException {
- return getRegionServerWithoutRetries(
- new ServerCallable<MultiResponse>(connection, tableName, null) {
- public MultiResponse call() throws IOException {
- return server.multi(multi);
- }
- @Override
- public void instantiateServer(boolean reload) throws IOException {
- server = connection.getHRegionConnection(address);
- }
- }
- );
- }
- };
- }
+ final HConnection connection = this;
+ return new Callable<MultiResponse>() {
+ public MultiResponse call() throws IOException {
+ return getRegionServerWithoutRetries(
+ new ServerCallable<MultiResponse>(connection, tableName, null) {
+ public MultiResponse call() throws IOException {
+ return server.multi(multi);
+ }
+ @Override
+ public void instantiateServer(boolean reload) throws IOException {
+ server = connection.getHRegionConnection(address);
+ }
+ }
+ );
+ }
+ };
+ }
- public void processBatch(List<Row> list,
+ public void processBatch(List<Row> list,
final byte[] tableName,
ExecutorService pool,
- Result[] results) throws IOException {
+ Object[] results) throws IOException, InterruptedException {
// results must be the same size as list
if (results.length != list.size()) {
@@ -1120,8 +1087,10 @@ public class HConnectionManager {
return;
}
+ // Keep track of the most recent servers for any given item for better
+ // exceptional reporting.
+ HServerAddress [] lastServers = new HServerAddress[results.length];
List<Row> workingList = new ArrayList<Row>(list);
- final boolean singletonList = (list.size() == 1);
boolean retry = true;
Throwable singleRowCause = null;
@@ -1131,19 +1100,13 @@ public class HConnectionManager {
if (tries >= 1) {
long sleepTime = getPauseTime(tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException ignore) {
- LOG.debug("Interupted");
- Thread.currentThread().interrupt();
- break;
- }
+ Thread.sleep(sleepTime);
}
// step 1: break up into regionserver-sized chunks and build the data structs
Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
- for (int i=0; i<workingList.size(); i++) {
+ for (int i = 0; i < workingList.size(); i++) {
Row row = workingList.get(i);
if (row != null) {
HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
@@ -1157,6 +1120,7 @@ public class HConnectionManager {
}
Action action = new Action(regionName, row, i);
+ lastServers[i] = address;
actions.add(regionName, action);
}
}
@@ -1176,58 +1140,50 @@ public class HConnectionManager {
HServerAddress address = responsePerServer.getKey();
try {
- // Gather the results for one server
Future<MultiResponse> future = responsePerServer.getValue();
-
- // Not really sure what a reasonable timeout value is. Here's a first try.
-
MultiResponse resp = future.get();
if (resp == null) {
// Entire server failed
LOG.debug("Failed all for server: " + address + ", removing from cache");
- } else {
- // For each region
- for (Entry<byte[], List<Pair<Integer,Result>>> e : resp.getResults().entrySet()) {
- byte[] regionName = e.getKey();
- List<Pair<Integer, Result>> regionResults = e.getValue();
- for (Pair<Integer, Result> regionResult : regionResults) {
- if (regionResult == null) {
- // if the first/only record is 'null' the entire region failed.
- LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache");
- } else {
- // success
- results[regionResult.getFirst()] = regionResult.getSecond();
- }
+ continue;
+ }
+
+ for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
+ byte[] regionName = e.getKey();
+ List<Pair<Integer, Object>> regionResults = e.getValue();
+ for (Pair<Integer, Object> regionResult : regionResults) {
+ if (regionResult == null) {
+ // if the first/only record is 'null' the entire region failed.
+ LOG.debug("Failures for region: " +
+ Bytes.toStringBinary(regionName) +
+ ", removing from cache");
+ } else {
+ // Result might be an Exception, including DNRIOE
+ results[regionResult.getFirst()] = regionResult.getSecond();
}
}
}
- } catch (InterruptedException e) {
- LOG.debug("Failed all from " + address, e);
- Thread.currentThread().interrupt();
- break;
} catch (ExecutionException e) {
LOG.debug("Failed all from " + address, e);
-
- // Just give up, leaving the batch incomplete
- if (e.getCause() instanceof DoNotRetryIOException) {
- throw (DoNotRetryIOException) e.getCause();
- }
-
- if (singletonList) {
- // be richer for reporting in a 1 row case.
- singleRowCause = e.getCause();
- }
}
}
+ // step 4: identify failures and prep for a retry (if applicable).
+
// Find failures (i.e. null Result), and add them to the workingList (in
// order), so they can be retried.
retry = false;
workingList.clear();
for (int i = 0; i < results.length; i++) {
- if (results[i] == null) {
+ // if null (fail) or instanceof Throwable && not instanceof DNRIOE
+ // then retry that row. else dont.
+ if (results[i] == null ||
+ (results[i] instanceof Throwable &&
+ !(results[i] instanceof DoNotRetryIOException))) {
+
retry = true;
+
Row row = list.get(i);
workingList.add(row);
deleteCachedLocation(tableName, row.getRow());
@@ -1238,19 +1194,31 @@ public class HConnectionManager {
}
}
- if (Thread.currentThread().isInterrupted()) {
- throw new IOException("Aborting attempt because of a thread interruption");
- }
-
if (retry) {
- // ran out of retries and didn't successfully finish everything!
+ // Simple little check for 1 item failures.
if (singleRowCause != null) {
throw new IOException(singleRowCause);
- } else {
- throw new RetriesExhaustedException("Still had " + workingList.size()
- + " actions left after retrying " + numRetries + " times.");
}
}
+
+
+ List<Throwable> exceptions = new ArrayList<Throwable>();
+ List<Row> actions = new ArrayList<Row>();
+ List<HServerAddress> addresses = new ArrayList<HServerAddress>();
+
+ for (int i = 0 ; i < results.length; i++) {
+ if (results[i] == null || results[i] instanceof Throwable) {
+ exceptions.add((Throwable)results[i]);
+ actions.add(list.get(i));
+ addresses.add(lastServers[i]);
+ }
+ }
+
+ if (!exceptions.isEmpty()) {
+ throw new RetriesExhaustedWithDetailsException(exceptions,
+ actions,
+ addresses);
+ }
}
/**
@@ -1259,16 +1227,21 @@ public class HConnectionManager {
public void processBatchOfPuts(List<Put> list,
final byte[] tableName,
ExecutorService pool) throws IOException {
- Result[] results = new Result[list.size()];
- processBatch((List) list, tableName, pool, results);
-
- // mutate list so that it is empty for complete success, or contains only failed records
- // results are returned in the same order as the requests in list
- // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
- for (int i = results.length - 1; i>=0; i--) {
- // if result is not null, it succeeded
- if (results[i] != null) {
- list.remove(i);
+ Object[] results = new Object[list.size()];
+ try {
+ processBatch((List) list, tableName, pool, results);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+
+ // mutate list so that it is empty for complete success, or contains only failed records
+ // results are returned in the same order as the requests in list
+ // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
+ for (int i = results.length - 1; i>=0; i--) {
+ if (results[i] instanceof Result) {
+ // successful Puts are removed from the list here.
+ list.remove(i);
+ }
}
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov 10 01:54:13 2010
@@ -553,8 +553,22 @@ public class HTable implements HTableInt
}
public Result[] get(List<Get> gets) throws IOException {
- return batch((List) gets);
- }
+ try {
+ Object [] r1 = batch((List)gets);
+
+ // translate.
+ Result [] results = new Result[r1.length];
+ int i=0;
+ for (Object o : r1) {
+ // batch ensures if there is a failure we get an exception instead
+ results[i++] = (Result) o;
+ }
+
+ return results;
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
/**
* Method that does a batch call on Deletes, Gets and Puts. The ordering of
@@ -563,13 +577,15 @@ public class HTable implements HTableInt
* guaranteed that the Get returns what the Put had put.
*
* @param actions list of Get, Put, Delete objects
- * @param results Empty Result[], same size as actions. Provides access to partial
- * results, in case an exception is thrown. A null in the result array means that
- * the call for that action failed, even after retries
+ * @param results Empty Result[], same size as actions. Provides access to
+ * partial results, in case an exception is thrown. If there are any failures,
+ * there will be a null or Throwable will be in the results array, AND an
+ * exception will be thrown.
* @throws IOException
*/
@Override
- public synchronized void batch(final List<Row> actions, final Result[] results) throws IOException {
+ public synchronized void batch(final List<Row> actions, final Object[] results)
+ throws InterruptedException, IOException {
connection.processBatch(actions, tableName, pool, results);
}
@@ -582,8 +598,8 @@ public class HTable implements HTableInt
* @throws IOException
*/
@Override
- public synchronized Result[] batch(final List<Row> actions) throws IOException {
- Result[] results = new Result[actions.size()];
+ public synchronized Object[] batch(final List<Row> actions) throws InterruptedException, IOException {
+ Object[] results = new Object[actions.size()];
connection.processBatch(actions, tableName, pool, results);
return results;
}
@@ -616,20 +632,25 @@ public class HTable implements HTableInt
* the {@code deletes} argument will contain the {@link Delete} instances
* that have not be successfully applied.
* @since 0.20.1
+ * @see {@link #batch(java.util.List, Object[])}
*/
@Override
public void delete(final List<Delete> deletes)
throws IOException {
- Result[] results = new Result[deletes.size()];
- connection.processBatch((List) deletes, tableName, pool, results);
-
- // mutate list so that it is empty for complete success, or contains only failed records
- // results are returned in the same order as the requests in list
- // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
- for (int i = results.length - 1; i>=0; i--) {
- // if result is not null, it succeeded
- if (results[i] != null) {
- deletes.remove(i);
+ Object[] results = new Object[deletes.size()];
+ try {
+ connection.processBatch((List) deletes, tableName, pool, results);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ // mutate list so that it is empty for complete success, or contains only failed records
+ // results are returned in the same order as the requests in list
+ // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
+ for (int i = results.length - 1; i>=0; i--) {
+ // if result is not null, it succeeded
+ if (results[i] instanceof Result) {
+ deletes.remove(i);
+ }
}
}
}
@@ -806,7 +827,7 @@ public class HTable implements HTableInt
}
@Override
- public void close() throws IOException{
+ public void close() throws IOException {
flushCommits();
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Nov 10 01:54:13 2010
@@ -74,24 +74,25 @@ public interface HTableInterface {
* Method that does a batch call on Deletes, Gets and Puts.
*
* @param actions list of Get, Put, Delete objects
- * @param results Empty Result[], same size as actions. Provides access to partial
+ * @param results Empty Object[], same size as actions. Provides access to partial
* results, in case an exception is thrown. A null in the result array means that
* the call for that action failed, even after retries
* @throws IOException
* @since 0.90.0
*/
- void batch(final List<Row> actions, final Result[] results) throws IOException;
+ void batch(final List<Row> actions, final Object[] results) throws IOException, InterruptedException;
/**
* Method that does a batch call on Deletes, Gets and Puts.
*
+ *
* @param actions list of Get, Put, Delete objects
* @return the results from the actions. A null in the return array means that
* the call for that action failed, even after retries
* @throws IOException
* @since 0.90.0
*/
- Result[] batch(final List<Row> actions) throws IOException;
+ Object[] batch(final List<Row> actions) throws IOException, InterruptedException;
/**
* Extracts certain cells from a given row.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Wed Nov 10 01:54:13 2010
@@ -25,10 +25,14 @@ import org.apache.hadoop.hbase.io.HbaseO
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -42,8 +46,8 @@ public class MultiResponse implements Wr
// map of regionName to list of (Results paired to the original index for that
// Result)
- private Map<byte[], List<Pair<Integer, Result>>> results =
- new TreeMap<byte[], List<Pair<Integer, Result>>>(Bytes.BYTES_COMPARATOR);
+ private Map<byte[], List<Pair<Integer, Object>>> results =
+ new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
public MultiResponse() {
}
@@ -68,32 +72,52 @@ public class MultiResponse implements Wr
* (request). Second item is the Result. Result will be empty for
* successful Put and Delete actions.
*/
- public void add(byte[] regionName, Pair<Integer, Result> r) {
- List<Pair<Integer, Result>> rs = results.get(regionName);
+ public void add(byte[] regionName, Pair<Integer, Object> r) {
+ List<Pair<Integer, Object>> rs = results.get(regionName);
if (rs == null) {
- rs = new ArrayList<Pair<Integer, Result>>();
+ rs = new ArrayList<Pair<Integer, Object>>();
results.put(regionName, rs);
}
rs.add(r);
}
- public Map<byte[], List<Pair<Integer, Result>>> getResults() {
+ public void add(byte []regionName, int originalIndex, Object resOrEx) {
+ add(regionName, new Pair<Integer,Object>(originalIndex, resOrEx));
+ }
+
+ public Map<byte[], List<Pair<Integer, Object>>> getResults() {
return results;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(results.size());
- for (Map.Entry<byte[], List<Pair<Integer, Result>>> e : results.entrySet()) {
+ for (Map.Entry<byte[], List<Pair<Integer, Object>>> e : results.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
- List<Pair<Integer, Result>> lst = e.getValue();
+ List<Pair<Integer, Object>> lst = e.getValue();
out.writeInt(lst.size());
- for (Pair<Integer, Result> r : lst) {
+ for (Pair<Integer, Object> r : lst) {
if (r == null) {
out.writeInt(-1); // Cant have index -1; on other side we recognize -1 as 'null'
} else {
out.writeInt(r.getFirst()); // Can this can npe!?!
- HbaseObjectWritable.writeObject(out, r.getSecond(), Result.class, null);
+ Object obj = r.getSecond();
+ if (obj instanceof Throwable) {
+ out.writeBoolean(true); // true, Throwable/exception.
+
+ Throwable t = (Throwable) obj;
+ // serialize exception
+ WritableUtils.writeString(out, t.getClass().getName());
+ WritableUtils.writeString(out,
+ StringUtils.stringifyException(t));
+
+ } else {
+ out.writeBoolean(false); // no exception
+
+ if (! (obj instanceof Writable))
+ obj = null; // squash all non-writables to null.
+ HbaseObjectWritable.writeObject(out, obj, Result.class, null);
+ }
}
}
}
@@ -106,15 +130,33 @@ public class MultiResponse implements Wr
for (int i = 0; i < mapSize; i++) {
byte[] key = Bytes.readByteArray(in);
int listSize = in.readInt();
- List<Pair<Integer, Result>> lst = new ArrayList<Pair<Integer, Result>>(
+ List<Pair<Integer, Object>> lst = new ArrayList<Pair<Integer, Object>>(
listSize);
for (int j = 0; j < listSize; j++) {
Integer idx = in.readInt();
if (idx == -1) {
lst.add(null);
} else {
- Result r = (Result) HbaseObjectWritable.readObject(in, null);
- lst.add(new Pair<Integer, Result>(idx, r));
+ boolean isException = in.readBoolean();
+ Object o = null;
+ if (isException) {
+ String klass = WritableUtils.readString(in);
+ String desc = WritableUtils.readString(in);
+ try {
+ // the type-unsafe insertion, but since we control what klass is..
+ Class<? extends Throwable> c = (Class<? extends Throwable>) Class.forName(klass);
+ Constructor<? extends Throwable> cn = c.getDeclaredConstructor(String.class);
+ o = cn.newInstance(desc);
+ } catch (ClassNotFoundException ignored) {
+ } catch (NoSuchMethodException ignored) {
+ } catch (InvocationTargetException ignored) {
+ } catch (InstantiationException ignored) {
+ } catch (IllegalAccessException ignored) {
+ }
+ } else {
+ o = HbaseObjectWritable.readObject(in, null);
+ }
+ lst.add(new Pair<Integer, Object>(idx, o));
}
}
results.put(key, lst);
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java?rev=1033321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java Wed Nov 10 01:54:13 2010
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This subclass of {@link org.apache.hadoop.hbase.client.RetriesExhaustedException}
+ * is thrown when we have more information about which rows were causing which
+ * exceptions on what servers. You can call {@link #mayHaveClusterIssues()}
+ * and if the result is false, you have input error problems, otherwise you
+ * may have cluster issues. You can iterate over the causes, rows and last
+ * known server addresses via {@link #getNumExceptions()} and
+ * {@link #getCause(int)}, {@link #getRow(int)} and {@link #getAddress(int)}.
+ */
+public class RetriesExhaustedWithDetailsException extends RetriesExhaustedException {
+
+ List<Throwable> exceptions;
+ List<Row> actions;
+ List<HServerAddress> addresses;
+
+ public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
+ List<Row> actions,
+ List<HServerAddress> addresses) {
+ super("Failed " + exceptions.size() + " action" +
+ pluralize(exceptions) + ": " +
+ getDesc(exceptions,actions,addresses));
+
+ this.exceptions = exceptions;
+ this.actions = actions;
+ this.addresses = addresses;
+ }
+
+ public List<Throwable> getCauses() {
+ return exceptions;
+ }
+
+ public int getNumExceptions() {
+ return exceptions.size();
+ }
+
+ public Throwable getCause(int i) {
+ return exceptions.get(i);
+ }
+
+ public Row getRow(int i) {
+ return actions.get(i);
+ }
+
+ public HServerAddress getAddress(int i) {
+ return addresses.get(i);
+ }
+
+ public boolean mayHaveClusterIssues() {
+ boolean res = false;
+
+ // If all of the exceptions are DNRIOE not exception
+ for (Throwable t : exceptions) {
+ if ( !(t instanceof DoNotRetryIOException)) {
+ res = true;
+ }
+ }
+ return res;
+ }
+
+
+ public static String pluralize(Collection<?> c) {
+ return pluralize(c.size());
+ }
+
+ public static String pluralize(int c) {
+ return c > 1 ? "s" : "";
+ }
+
+ public static String getDesc(List<Throwable> exceptions,
+ List<Row> actions,
+ List<HServerAddress> addresses) {
+ String s = getDesc(classifyExs(exceptions));
+ s += "servers with issues: ";
+ Set<HServerAddress> uniqAddr = new HashSet<HServerAddress>();
+ uniqAddr.addAll(addresses);
+ for(HServerAddress addr : uniqAddr) {
+ s += addr + ", ";
+ }
+ return s;
+ }
+
+ public static Map<String, Integer> classifyExs(List<Throwable> ths) {
+ Map<String, Integer> cls = new HashMap<String, Integer>();
+ for (Throwable t : ths) {
+ String name = t.getClass().getSimpleName();
+ Integer i = cls.get(name);
+ if (i == null) {
+ i = 0;
+ }
+ i += 1;
+ cls.put(name, i);
+ }
+ return cls;
+ }
+
+ public static String getDesc(Map<String,Integer> classificaton) {
+ String s = "";
+ for (Map.Entry<String, Integer> e : classificaton.entrySet()) {
+ s += e.getKey() + ": " + e.getValue() + " time" +
+ pluralize(e.getValue()) + ", ";
+ }
+ return s;
+ }
+
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Nov 10 01:54:13 2010
@@ -46,12 +46,14 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -1884,7 +1886,7 @@ public class HRegionServer implements HR
String lockName = String.valueOf(lockId);
Integer rl = rowlocks.get(lockName);
if (rl == null) {
- throw new IOException("Invalid row lock");
+ throw new UnknownRowLockException("Invalid row lock");
}
this.leases.renewLease(lockName);
return rl;
@@ -2374,7 +2376,9 @@ public class HRegionServer implements HR
@SuppressWarnings("unchecked")
@Override
public MultiResponse multi(MultiAction multi) throws IOException {
+
MultiResponse response = new MultiResponse();
+
for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
byte[] regionName = e.getKey();
List<Action> actionsForRegion = e.getValue();
@@ -2382,71 +2386,81 @@ public class HRegionServer implements HR
// end of a region, so that we don't have to try the rest of the
// actions in the list.
Collections.sort(actionsForRegion);
- Row action = null;
+ Row action;
List<Action> puts = new ArrayList<Action>();
- try {
- for (Action a : actionsForRegion) {
- action = a.getAction();
- // TODO catch exceptions so we can report them on a per-item basis.
+ for (Action a : actionsForRegion) {
+ action = a.getAction();
+ int originalIndex = a.getOriginalIndex();
+
+ try {
if (action instanceof Delete) {
delete(regionName, (Delete) action);
- response.add(regionName, new Pair<Integer, Result>(
- a.getOriginalIndex(), new Result()));
+ response.add(regionName, originalIndex, new Result());
} else if (action instanceof Get) {
- response.add(regionName, new Pair<Integer, Result>(
- a.getOriginalIndex(), get(regionName, (Get) action)));
+ response.add(regionName, originalIndex, get(regionName, (Get) action));
} else if (action instanceof Put) {
- puts.add(a);
+ puts.add(a); // wont throw.
} else {
LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
- throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete or Put.");
+ throw new DoNotRetryIOException("Invalid Action, row must be a Get, Delete or Put.");
}
+ } catch (IOException ex) {
+ response.add(regionName, originalIndex, ex);
}
+ }
- // We do the puts with result.put so we can get the batching efficiency
- // we so need. All this data munging doesn't seem great, but at least
- // we arent copying bytes or anything.
- if (!puts.isEmpty()) {
+ // We do the puts with result.put so we can get the batching efficiency
+ // we so need. All this data munging doesn't seem great, but at least
+ // we arent copying bytes or anything.
+ if (!puts.isEmpty()) {
+ try {
HRegion region = getRegion(regionName);
+
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
- Pair<Put,Integer> [] putsWithLocks = new Pair[puts.size()];
- int i = 0;
+ List<Pair<Put,Integer>> putsWithLocks =
+ Lists.newArrayListWithCapacity(puts.size());
for (Action a : puts) {
Put p = (Put) a.getAction();
- Integer lock = getLockFromId(p.getLockId());
- putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+ Integer lock;
+ try {
+ lock = getLockFromId(p.getLockId());
+ } catch (UnknownRowLockException ex) {
+ response.add(regionName, a.getOriginalIndex(), ex);
+ continue;
+ }
+ putsWithLocks.add(new Pair<Put, Integer>(p, lock));
}
this.requestCount.addAndGet(puts.size());
- OperationStatusCode[] codes = region.put(putsWithLocks);
- for( i = 0 ; i < codes.length ; i++) {
+ OperationStatusCode[] codes =
+ region.put(putsWithLocks.toArray(new Pair[]{}));
+
+ for( int i = 0 ; i < codes.length ; i++) {
OperationStatusCode code = codes[i];
Action theAction = puts.get(i);
- Result result = null;
+ Object result = null;
if (code == OperationStatusCode.SUCCESS) {
result = new Result();
+ } else if (code == OperationStatusCode.BAD_FAMILY) {
+ result = new NoSuchColumnFamilyException();
}
- // TODO turning the alternate exception into a different result
+ // FAILURE && NOT_RUN becomes null, aka: need to run again.
- response.add(regionName,
- new Pair<Integer, Result>(
- theAction.getOriginalIndex(), result));
+ response.add(regionName, theAction.getOriginalIndex(), result);
+ }
+ } catch (IOException ioe) {
+ // fail all the puts with the ioe in question.
+ for (Action a: puts) {
+ response.add(regionName, a.getOriginalIndex(), ioe);
}
}
- } catch (IOException ioe) {
- if (multi.size() == 1) throw ioe;
- LOG.debug("Exception processing " +
- org.apache.commons.lang.StringUtils.abbreviate(action.toString(), 64) +
- "; " + ioe.getMessage());
- response.add(regionName,null);
- // stop processing on this region, continue to the next.
}
}
return response;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java Wed Nov 10 01:54:13 2010
@@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.DoNotRetr
/**
* Thrown by the region server when it is shutting down state.
+ *
+ * Should NEVER be thrown to HBase clients, they will abort the call chain
+ * and not retry even though regions will transition to new servers.
*/
@SuppressWarnings("serial")
public class RegionServerStoppedException extends DoNotRetryIOException {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Wed Nov 10 01:54:13 2010
@@ -604,12 +604,12 @@ public class RemoteHTable implements HTa
}
@Override
- public void batch(List<Row> actions, Result[] results) throws IOException {
+ public void batch(List<Row> actions, Object[] results) throws IOException {
throw new IOException("batch not supported");
}
@Override
- public Result[] batch(List<Row> actions) throws IOException {
+ public Object[] batch(List<Row> actions) throws IOException {
throw new IOException("batch not supported");
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Wed Nov 10 01:54:13 2010
@@ -35,6 +35,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.*;
+
public class TestMultiParallel {
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -137,6 +139,35 @@ public class TestMultiParallel {
}
}
+ @Test
+ public void testBadFam() throws Exception {
+ LOG.info("test=testBadFam");
+ HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
+
+ List<Row> actions = new ArrayList<Row>();
+ Put p = new Put(Bytes.toBytes("row1"));
+ p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
+ actions.add(p);
+ p = new Put(Bytes.toBytes("row2"));
+ p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
+ actions.add(p);
+
+ // row1 and row2 should be in the same region.
+
+ Object [] r = new Object[actions.size()];
+ try {
+ table.batch(actions, r);
+ fail();
+ } catch (RetriesExhaustedWithDetailsException ex) {
+ LOG.debug(ex);
+ // good!
+ assertFalse(ex.mayHaveClusterIssues());
+ }
+ assertEquals(2, r.length);
+ assertTrue(r[0] instanceof Throwable);
+ assertTrue(r[1] instanceof Result);
+ }
+
/**
* Only run one Multi test with a forced RegionServer abort. Otherwise, the
* unit tests will take an unnecessarily long time to run.
@@ -208,7 +239,7 @@ public class TestMultiParallel {
// put multiple rows using a batch
List<Row> puts = constructPutRequests();
- Result[] results = table.batch(puts);
+ Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
if (true) {
@@ -228,7 +259,7 @@ public class TestMultiParallel {
// Load some data
List<Row> puts = constructPutRequests();
- Result[] results = table.batch(puts);
+ Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
// Deletes
@@ -256,7 +287,7 @@ public class TestMultiParallel {
// Load some data
List<Row> puts = constructPutRequests();
- Result[] results = table.batch(puts);
+ Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
// Deletes
@@ -289,7 +320,7 @@ public class TestMultiParallel {
put.add(BYTES_FAMILY, qual, VALUE);
puts.add(put);
}
- Result[] results = table.batch(puts);
+ Object[] results = table.batch(puts);
// validate
validateSizeAndEmpty(results, 100);
@@ -303,10 +334,10 @@ public class TestMultiParallel {
gets.add(get);
}
- Result[] multiRes = table.batch(gets);
+ Object[] multiRes = table.batch(gets);
int idx = 0;
- for (Result r : multiRes) {
+ for (Object r : multiRes) {
byte[] qual = Bytes.toBytes("column" + idx);
validateResult(r, qual, VALUE);
idx++;
@@ -319,7 +350,7 @@ public class TestMultiParallel {
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
// Load some data to start
- Result[] results = table.batch(constructPutRequests());
+ Object[] results = table.batch(constructPutRequests());
validateSizeAndEmpty(results, KEYS.length);
// Batch: get, get, put(new col), delete, get, get of put, get of deleted,
@@ -383,11 +414,13 @@ public class TestMultiParallel {
// // Helper methods ////
- private void validateResult(Result r) {
+ private void validateResult(Object r) {
validateResult(r, QUALIFIER, VALUE);
}
- private void validateResult(Result r, byte[] qual, byte[] val) {
+ private void validateResult(Object r1, byte[] qual, byte[] val) {
+ // TODO provide nice assert here or something.
+ Result r = (Result)r1;
Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
}
@@ -415,16 +448,17 @@ public class TestMultiParallel {
}
}
- private void validateEmpty(Result result) {
+ private void validateEmpty(Object r1) {
+ Result result = (Result)r1;
Assert.assertTrue(result != null);
Assert.assertTrue(result.getRow() == null);
Assert.assertEquals(0, result.raw().length);
}
- private void validateSizeAndEmpty(Result[] results, int expectedSize) {
+ private void validateSizeAndEmpty(Object[] results, int expectedSize) {
// Validate got back the same number of Result objects, all empty
Assert.assertEquals(expectedSize, results.length);
- for (Result result : results) {
+ for (Object result : results) {
validateEmpty(result);
}
}