You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/12 22:17:20 UTC
svn commit: r1576909 [5/18] - in /hbase/branches/0.89-fb/src: ./
examples/thrift/ main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/avro/
main/java/org/apache/hadoop/hbase/avro/generated/
main/java/org/apache/hadoop/hbase/client/ mai...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Mar 12 21:17:13 2014
@@ -19,35 +19,46 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
-import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
import org.apache.hadoop.hbase.ipc.ProfilingData;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DaemonThreadFactory;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionserverUtils;
import org.apache.hadoop.hbase.util.Writables;
import com.google.common.base.Preconditions;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
/**
* Used to communicate with a single HBase table.
*
@@ -73,8 +84,9 @@ public class HTable implements HTableInt
private HBaseRPCOptions options;
private boolean recordClientContext = false;
- @SuppressWarnings("unused")
private long maxScannerResultSize;
+ private HTableAsync hta;
+ private boolean doAsync;
// Share this multiaction thread pool across all the HTable instance;
// The total number of threads will be bounded #HTable * #RegionServer.
@@ -87,6 +99,13 @@ public class HTable implements HTableInt
((ThreadPoolExecutor)multiActionThreadPool).allowCoreThreadTimeOut(true);
}
+
+ public void initHTableAsync() throws IOException {
+ if (doAsync && hta == null) {
+ hta = new HTableAsync(configuration, tableName);
+ }
+ }
+
/**
* Creates an object to access a HBase table. DO NOT USE THIS CONSTRUCTOR.
* It will make your unit tests fail due to incorrect ZK client port.
@@ -168,8 +187,15 @@ public class HTable implements HTableInt
this.options.setRxCompression(
Compression.getCompressionAlgorithmByName(compressionAlgo));
}
+ // check if we are using swift protocol too
+ this.doAsync = configuration.getBoolean(HConstants.HTABLE_ASYNC_CALLS,
+ HConstants.HTABLE_ASYNC_CALLS_DEFAULT)
+ && configuration.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
+ HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
+ HBaseThriftRPC.setUsePooling(conf.getBoolean("hbase.client.useconnectionpooling", true));
}
+ @Override
public Configuration getConfiguration() {
return configuration;
}
@@ -274,6 +300,7 @@ public class HTable implements HTableInt
return connection.getRegionLocation(tableName, row, reload);
}
+ @Override
public byte [] getTableName() {
return this.tableName;
}
@@ -326,6 +353,7 @@ public class HTable implements HTableInt
this.connection.clearRegionCache();
}
+ @Override
public HTableDescriptor getTableDescriptor() throws IOException {
return new UnmodifyableHTableDescriptor(
this.connection.getHTableDescriptor(this.tableName));
@@ -365,6 +393,7 @@ public class HTable implements HTableInt
final List<byte[]> startKeyList = new ArrayList<byte[]>();
final List<byte[]> endKeyList = new ArrayList<byte[]>();
MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
public boolean processRow(Result rowResult) throws IOException {
HRegionInfo info = Writables.getHRegionInfo(
rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -383,7 +412,7 @@ public class HTable implements HTableInt
new byte[startKeyList.size()][]), endKeyList.toArray(
new byte[endKeyList.size()][]));
}
-
+
/**
* Gets the starting and ending row keys for every region in the currently
* open table.
@@ -396,6 +425,7 @@ public class HTable implements HTableInt
final TreeMap<byte[], byte[]> startEndKeysMap =
new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
public boolean processRow(Result rowResult) throws IOException {
HRegionInfo info = Writables.getHRegionInfo(
rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -413,11 +443,11 @@ public class HTable implements HTableInt
}
/**
- * Returns the Array of StartKeys along with the favoredNodes
- * for a particular region. Identifying the the favoredNodes using the
- * Meta table similar to the
- * {@link org.apache.hadoop.hbase.client.HTable.getStartEndKeys()}
- * function
+ * Returns the Array of StartKeys along with the favoredNodes
+ * for a particular region. Identifying the the favoredNodes using the
+ * Meta table similar to the
+ * {@link org.apache.hadoop.hbase.client.HTable.getStartEndKeys()}
+ * function
* @return
* @throws IOException
*/
@@ -427,6 +457,7 @@ public class HTable implements HTableInt
final List<byte[]> favoredNodes =
new ArrayList<byte[]>();
MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
public boolean processRow(Result rowResult) throws IOException {
HRegionInfo info = Writables.getHRegionInfo(
rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -464,6 +495,7 @@ public class HTable implements HTableInt
new TreeMap<HRegionInfo, HServerAddress>();
MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
public boolean processRow(Result rowResult) throws IOException {
HRegionInfo info = Writables.getHRegionInfo(
rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -593,29 +625,44 @@ public class HTable implements HTableInt
return allRegions;
}
- public Result getRowOrBefore(final byte[] row, final byte[] family)
- throws IOException {
- return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
- new ServerCallable<Result>(connection, tableName, row, this.options) {
- public Result call() throws IOException {
- return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
- row, family);
- }
- });
- }
+ @Override
+ public Result getRowOrBefore(final byte[] row, final byte[] family)
+ throws IOException {
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ return hta.getRowOrBeforeAsync(row, family).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+ return this
+ .getConnectionAndResetOperationContext()
+ .getRegionServerWithRetries(
+ new ServerCallable<Result>(connection, tableName, row, this.options) {
+ @Override
+ public Result call() throws IOException {
+ Result result = server.getClosestRowBefore(location
+ .getRegionInfo().getRegionName(), row, family);
+ return result;
+ }
+ });
+ }
+ @SuppressWarnings("resource")
+ @Override
public ResultScanner getScanner(final Scan scan) throws IOException {
- ClientScanner s = new ClientScanner(scan);
- s.initialize();
- return s;
+ return new HTableClientScanner(scan, this).initialize();
}
+ @Override
public ResultScanner getScanner(byte [] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(scan);
}
+ @Override
public ResultScanner getScanner(byte [] family, byte [] qualifier)
throws IOException {
Scan scan = new Scan();
@@ -669,9 +716,19 @@ public class HTable implements HTableInt
return this.getConnectionAndResetOperationContext().getServerConfProperty(name);
}
+ @Override
public Result get(final Get get) throws IOException {
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ return hta.getAsync(get).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
new ServerCallable<Result>(connection, tableName, get.getRow(), this.options) {
+ @Override
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
@@ -679,10 +736,11 @@ public class HTable implements HTableInt
);
}
+ @Override
public Result[] get(List<Get> gets) throws IOException {
return this.getConnectionAndResetOperationContext().processBatchOfGets(gets, tableName, this.options);
}
-
+
/**
* Get collected profiling data and clears it from the HTable
* @return aggregated profiling data
@@ -704,6 +762,7 @@ public class HTable implements HTableInt
this.getConnectionAndResetOperationContext().processBatchedGets(actions, tableName, multiActionThreadPool,
results, this.options);
} catch (Exception e) {
+ e.printStackTrace();
throw new IOException(e);
}
return results;
@@ -724,36 +783,58 @@ public class HTable implements HTableInt
}
/**
- * {@inheritDoc}
+ * @param delete
+ * @throws IOException
*/
@Override
- public void delete(final Delete delete)
- throws IOException {
+ public void delete(final Delete delete) throws IOException {
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ hta.deleteAsync(delete).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ } else {
this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection,
- tableName, delete.getRow(), this.options) {
- public Boolean call() throws IOException {
- server.delete(location.getRegionInfo().getRegionName(), delete);
- return null; // FindBugs NP_BOOLEAN_RETURN_NULL
- }
- }
- );
+ new ServerCallable<Boolean>(connection, tableName, delete.getRow(),
+ this.options) {
+ @Override
+ public Boolean call() throws IOException {
+ server.delete(location.getRegionInfo().getRegionName(), delete);
+ return null; // FindBugs NP_BOOLEAN_RETURN_NULL
+ }
+ });
+ }
}
+ @Override
public void delete(final List<Delete> deletes)
throws IOException {
int last = 0;
try {
- last = this.getConnectionAndResetOperationContext().processBatchOfDeletes(deletes, this.tableName, this.options);
+ last = this.getConnectionAndResetOperationContext().processBatchOfDeletes(
+ deletes, this.tableName, this.options);
} finally {
deletes.subList(0, last).clear();
}
}
+ @Override
public void put(final Put put) throws IOException {
- doPut(Arrays.asList(put));
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ hta.putAsync(put).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ } else {
+ doPut(Arrays.asList(put));
+ }
}
+ @Override
public void put(final List<Put> puts) throws IOException {
doPut(puts);
}
@@ -769,13 +850,14 @@ public class HTable implements HTableInt
}
}
+ @Override
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount)
throws IOException {
return incrementColumnValue(row, family, qualifier, amount, true);
}
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
+ @Override
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount, final boolean writeToWAL)
throws IOException {
@@ -791,6 +873,7 @@ public class HTable implements HTableInt
}
return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
new ServerCallable<Long>(connection, tableName, row, this.options) {
+ @Override
public Long call() throws IOException {
return server.incrementColumnValue(
location.getRegionInfo().getRegionName(), row, family,
@@ -813,12 +896,14 @@ public class HTable implements HTableInt
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
+ @Override
public boolean checkAndPut(final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)
throws IOException {
return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row, this.options) {
+ @Override
public Boolean call() throws IOException {
return server.checkAndPut(location.getRegionInfo().getRegionName(),
row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
@@ -840,12 +925,14 @@ public class HTable implements HTableInt
* @throws IOException
* @return true if the new delete was executed, false otherwise
*/
+ @Override
public boolean checkAndDelete(final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Delete delete)
throws IOException {
return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row, this.options) {
+ @Override
public Boolean call() throws IOException {
return server.checkAndDelete(
location.getRegionInfo().getRegionName(),
@@ -861,13 +948,25 @@ public class HTable implements HTableInt
*/
@Override
public void mutateRow(final RowMutations arm) throws IOException {
- this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
- new ServerCallable<Void>(this.connection, tableName, arm.getRow(), this.options) {
- public Void call() throws IOException {
- server.mutateRow(location.getRegionInfo().getRegionName(), arm);
- return null;
- }
- });
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ hta.mutateRowAsync(arm).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ } else {
+ this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
+ new ServerCallable<Void>(this.connection, tableName, arm.getRow(),
+ this.options) {
+ @Override
+ public Void call() throws IOException, InterruptedException,
+ ExecutionException {
+ server.mutateRow(location.getRegionInfo().getRegionName(), arm);
+ return null;
+ }
+ });
+ }
}
/**
@@ -875,7 +974,8 @@ public class HTable implements HTableInt
*/
@Override
public void mutateRow(final List<RowMutations> armList) throws IOException {
- this.getConnectionAndResetOperationContext().processBatchOfRowMutations(armList, this.tableName, this.options);
+ this.getConnectionAndResetOperationContext().processBatchOfRowMutations(
+ armList, this.tableName, this.options);
}
/**
@@ -889,9 +989,11 @@ public class HTable implements HTableInt
* @return true if the specified Get matches one or more keys, false if not
* @throws IOException
*/
+ @Override
public boolean exists(final Get get) throws IOException {
return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, get.getRow(), this.options) {
+ @Override
public Boolean call() throws IOException {
return server.
exists(location.getRegionInfo().getRegionName(), get);
@@ -900,9 +1002,11 @@ public class HTable implements HTableInt
);
}
+ @Override
public void flushCommits() throws IOException {
try {
- this.getConnectionAndResetOperationContext().processBatchOfPuts(writeBuffer, tableName, this.options);
+ this.getConnectionAndResetOperationContext().
+ processBatchOfPuts(writeBuffer, tableName, this.options);
} finally {
if (clearBufferOnFail) {
writeBuffer.clear();
@@ -941,32 +1045,53 @@ public class HTable implements HTableInt
}
}
- public RowLock lockRow(final byte [] row)
- throws IOException {
- return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
- new ServerCallable<RowLock>(connection, tableName, row, this.options) {
- public RowLock call() throws IOException {
- long lockId =
- server.lockRow(location.getRegionInfo().getRegionName(), row);
- return new RowLock(row,lockId);
- }
+ @Override
+ public RowLock lockRow(final byte[] row) throws IOException {
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ return hta.lockRowAsync(row).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
}
- );
+ }
+ return this.getConnectionAndResetOperationContext()
+ .getRegionServerWithRetries(
+ new ServerCallable<RowLock>(connection, tableName, row,
+ this.options) {
+ @Override
+ public RowLock call() throws IOException {
+ long lockId = server.lockRow(location.getRegionInfo()
+ .getRegionName(), row);
+ return new RowLock(row, lockId);
+ }
+ });
}
- public void unlockRow(final RowLock rl)
- throws IOException {
- this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, rl.getRow(), this.options) {
- public Boolean call() throws IOException {
- server.unlockRow(location.getRegionInfo().getRegionName(),
- rl.getLockId());
- return null; // FindBugs NP_BOOLEAN_RETURN_NULL
- }
+ @Override
+ public void unlockRow(final RowLock rl) throws IOException {
+ initHTableAsync();
+ if (hta != null) {
+ try {
+ hta.unlockRowAsync(rl).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
}
- );
+ } else {
+ this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
+ new ServerCallable<Boolean>(connection, tableName, rl.getRow(),
+ this.options) {
+ @Override
+ public Boolean call() throws IOException {
+ server.unlockRow(location.getRegionInfo().getRegionName(),
+ rl.getLockId());
+ return null; // FindBugs NP_BOOLEAN_RETURN_NULL
+ }
+ });
+ }
}
+ @Override
public boolean isAutoFlush() {
return autoFlush;
}
@@ -1078,157 +1203,6 @@ public class HTable implements HTableInt
}
/**
- * Implements the scanner interface for the HBase client.
- * If there are multiple regions in a table, this scanner will iterate
- * through them all.
- */
- protected class ClientScanner extends ResultScannerImpl {
- // HEADSUP: The scan internal start row can change as we move through table.
- // Current region scanner is against. Gets cleared if current region goes
- // wonky: e.g. if it splits on us.
- private ScannerCallable callable = null;
- // Keep lastResult returned successfully in case we have to reset scanner.
- private Result lastResult = null;
-
- protected ClientScanner(final Scan scan) {
- super(scan, HTable.this);
- // Removed filter validation. We have a new format now, only one of all
- // the current filters has a validate() method. We can add it back,
- // need to decide on what we're going to do re: filter redesign.
- // Need, at the least, to break up family from qualifier as separate
- // checks, I think it's important server-side filters are optimal in that
- // respect.
- }
-
- protected Scan getScan() {
- return scan;
- }
-
- protected ScannerCallable getScannerCallable(byte [] localStartKey,
- int nbRows, HBaseRPCOptions options) {
- scan.setStartRow(localStartKey);
- ScannerCallable s = new ScannerCallable(
- getConnectionAndResetOperationContext(), getTableName(), scan,
- options);
- s.setCaching(nbRows);
- return s;
- }
-
- @Override
- protected void cleanUpPreviousScanners() throws IOException {
- // Close the previous scanner if it's open
- if (this.callable != null) {
- this.callable.setClose();
- getConnectionAndResetOperationContext().getRegionServerWithRetries(
- callable);
- this.callable = null;
- }
-
- }
-
- @Override
- protected boolean doRealOpenScanners(byte[] localStartKey, int nbRows)
- throws IOException {
- try {
- callable = getScannerCallable(localStartKey, nbRows, options);
- // Open a scanner on the region server starting at the
- // beginning of the region
- getConnectionAndResetOperationContext().getRegionServerWithRetries(
- callable);
- this.currentRegion = callable.getHRegionInfo();
- } catch (IOException e) {
- close();
- throw e;
- }
- return true;
- }
-
- @Override
- protected void cacheNextResults() throws IOException {
- Result [] values = null;
- // We need to reset it if it's a new callable that was created
- // with a countdown in nextScanner
- callable.setCaching(this.caching);
- // This flag is set when we want to skip the result returned. We do
- // this when we reset scanner because it split under us.
- boolean skipFirst = false;
- boolean foundResults = false;
- do {
- try {
- // Server returns a null values if scanning is to stop. Else,
- // returns an empty array if scanning is to go on and we've just
- // exhausted current region.
- values = getConnectionAndResetOperationContext(
- ).getRegionServerWithRetries(callable);
- if (skipFirst) {
- skipFirst = false;
- // Reget.
- values = getConnectionAndResetOperationContext()
- .getRegionServerWithRetries(callable);
- }
- } catch (DoNotRetryIOException e) {
- if (e instanceof UnknownScannerException) {
- long timeout = this.lastNextCallTimeStamp + scannerTimeout;
- // If we are over the timeout, throw this exception to the client
- // Else, it's because the region moved and we used the old id
- // against the new region server; reset the scanner.
- if (timeout < System.currentTimeMillis()) {
- long elapsed = System.currentTimeMillis()
- - this.lastNextCallTimeStamp;
- ScannerTimeoutException ex = new ScannerTimeoutException(
- elapsed + "ms passed since the last invocation, " +
- "timeout is currently set to " + scannerTimeout);
- ex.initCause(e);
- throw ex;
- }
- } else {
- Throwable cause = e.getCause();
- if (cause == null
- || !(cause instanceof NotServingRegionException)) {
- throw e;
- }
- }
- // Else, its signal from depths of ScannerCallable that we got an
- // NSRE on a next and that we need to reset the scanner.
- if (this.lastResult != null) {
- this.scan.setStartRow(this.lastResult.getRow());
- // Skip first row returned. We already let it out on previous
- // invocation.
- skipFirst = true;
- }
- // Clear region
- this.currentRegion = null;
- }
- this.lastNextCallTimeStamp = System.currentTimeMillis();
- if (values != null && values.length > 0) {
- foundResults = true;
- for (Result rs : values) {
- cache.add(rs);
- this.lastResult = rs;
- }
- }
- } while (!foundResults && nextScanner(this.caching, values == null));
- }
-
- @Override
- protected void closeCurrentScanner() {
- if (callable != null) {
- callable.setClose();
- try {
- getConnectionAndResetOperationContext().getRegionServerWithRetries(
- callable);
- } catch (IOException e) {
- // We used to catch this error, interpret, and rethrow. However, we
- // have since decided that it's not nice for a scanner's close to
- // throw exceptions. Chances are it was just an UnknownScanner
- // exception due to lease time out.
- }
- callable = null;
- }
- }
- }
-
- /**
* Enable or disable region cache prefetch for the table. It will be
* applied for the given table's all HTable instances who share the same
* connection. By default, the cache prefetch is enabled.
@@ -1280,14 +1254,14 @@ public class HTable implements HTableInt
return HConnectionManager.getConnection(HBaseConfiguration.create()).
getRegionCachePrefetch(tableName);
}
-
+
/**
* Set profiling request on/off for every subsequent RPC calls
* @param prof profiling true/false
*/
@Override
public void setProfiling(boolean prof) {
- options.setRequestProfiling (prof);
+ options.setRequestProfiling(prof);
options.profilingResult = null;
}
@@ -1305,7 +1279,7 @@ public class HTable implements HTableInt
public String getTag () {
return this.options.getTag ();
}
-
+
/**
* set compression used to send RPC calls to the server
* @param alg compression algorithm
@@ -1313,11 +1287,11 @@ public class HTable implements HTableInt
public void setTxCompression(Compression.Algorithm alg) {
this.options.setTxCompression(alg);
}
-
+
public Compression.Algorithm getTxCompression() {
return this.options.getTxCompression();
}
-
+
/**
* set compression used to receive RPC responses from the server
* @param alg compression algorithm
@@ -1325,7 +1299,7 @@ public class HTable implements HTableInt
public void setRxCompression(Compression.Algorithm alg) {
this.options.setRxCompression(alg);
}
-
+
public Compression.Algorithm getRxCompression() {
return this.options.getRxCompression();
}
@@ -1376,6 +1350,7 @@ public class HTable implements HTableInt
.getRegionServerWithRetries(
new ServerCallable<List<Bucket>>(connection,
tableName, row, this.options) {
+ @Override
public List<Bucket> call() throws IOException {
if (cf != null) {
return server.getHistogramForStore(
@@ -1415,6 +1390,7 @@ public class HTable implements HTableInt
for (final byte[] row : this.getStartKeys()) {
futures.put(row, HTable.multiActionThreadPool.submit(
new Callable<List<Bucket>>() {
+ @Override
public List<Bucket> call() throws Exception {
if (family == null) {
return getHistogram(row);
@@ -1437,4 +1413,16 @@ public class HTable implements HTableInt
}
return ret;
}
+
+ public long getMaxScannerResultSize() {
+ return maxScannerResultSize;
+ }
+
+ public HConnection getConnection() {
+ return connection;
+ }
+
+ public HBaseRPCOptions getOptions() {
+ return options;
+ }
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,248 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.HConnectionParams;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseToThriftAdapter;
+import org.apache.hadoop.hbase.thrift.SelfRetryingListenableFuture;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+
+/**
+ * Used to communicate with a single HBase table.
+ * Provide additional asynchronous APIs as complement of HTableInterface.
+ */
+public class HTableAsync extends HTable implements HTableAsyncInterface {
+
+ //TODO: decide what is a good number of core threads. Max thread number seems unconfigurable.
+ private final ListeningScheduledExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(
+ HConstants.DEFAULT_HTABLE_ASYNC_CORE_THREADS, new DaemonThreadFactory("htable-async-thread-")));
+
+ private HConnectionParams hConnectionParams;
+
+ /**
+ * Creates an object to access a HBase table through asynchronous APIs.
+ *
+ * @param conf Configuration object to use.
+ * @param tableName Name of the table.
+ * @throws java.io.IOException if a remote or network exception occurs
+ */
+ public HTableAsync(Configuration conf, String tableName)
+ throws IOException {
+ this(conf, Bytes.toBytes(tableName));
+
+ this.hConnectionParams = HConnectionParams.getInstance(conf);
+ }
+
+ /**
+ * Creates an object to access a HBase table through asynchronous APIs.
+ *
+ * @param conf Configuration object to use.
+ * @param tableName Name of the table.
+ * @throws IOException if a remote or network exception occurs
+ */
+ public HTableAsync(Configuration conf, byte[] tableName)
+ throws IOException {
+ super(conf, tableName);
+
+ this.hConnectionParams = HConnectionParams.getInstance(conf);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Result> getAsync(final Get get) {
+ ServerCallable<ListenableFuture<Result>> callable = new ServerCallable<ListenableFuture<Result>>(
+ getConnection(), getTableName(), get.getRow(), getOptions()) {
+ @Override
+ public ListenableFuture<Result> call() throws Exception {
+ return ((HBaseToThriftAdapter)server).getAsync(location.getRegionInfo().getRegionName(), get);
+ }
+ };
+
+ SelfRetryingListenableFuture<Result> future = new SelfRetryingListenableFuture<>(this, callable,
+ hConnectionParams, executorService);
+
+ return future.startFuture();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Result[]> batchGetAsync(final List<Get> list) {
+ return executorService.submit(new Callable<Result[]>() {
+ public Result[] call() throws IOException {
+ return batchGet(list);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Void> putAsync(final Put put) {
+ // Since put has a buffer on client side, use mutateRowAsync instead
+ try {
+ RowMutations arm = new RowMutations.Builder(put.getRow()).add(put).create();
+ return mutateRowAsync(arm);
+ } catch (IOException e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Result> getRowOrBeforeAsync(final byte[] row, final byte[] family) {
+ ServerCallable<ListenableFuture<Result>> callable = new ServerCallable<ListenableFuture<Result>>(
+ getConnection(), getTableName(), row, getOptions()) {
+ @Override
+ public ListenableFuture<Result> call() throws Exception {
+ return ((HBaseToThriftAdapter)server).getClosestRowBeforeAsync(
+ location.getRegionInfo().getRegionName(), row, family);
+ }
+ };
+
+ SelfRetryingListenableFuture<Result> future = new SelfRetryingListenableFuture<>(this, callable,
+ hConnectionParams, executorService);
+
+ return future.startFuture();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Void> deleteAsync(final Delete delete) {
+ ServerCallable<ListenableFuture<Void>> callable = new ServerCallable<ListenableFuture<Void>>(
+ getConnection(), getTableName(), delete.getRow(), getOptions()) {
+ @Override
+ public ListenableFuture<Void> call() throws Exception {
+ return ((HBaseToThriftAdapter)server).deleteAsync(location.getRegionInfo().getRegionName(), delete);
+ }
+ };
+
+ SelfRetryingListenableFuture<Void> future = new SelfRetryingListenableFuture<>(this, callable,
+ hConnectionParams, executorService);
+
+ return future.startFuture();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Void> mutateRowAsync(final RowMutations arm) {
+ ServerCallable<ListenableFuture<Void>> callable = new ServerCallable<ListenableFuture<Void>>(
+ getConnection(), getTableName(), arm.getRow(), getOptions()) {
+ @Override
+ public ListenableFuture<Void> call() throws Exception {
+ return ((HBaseToThriftAdapter)server).mutateRowAsync(location.getRegionInfo().getRegionName(), arm);
+ }
+ };
+
+ SelfRetryingListenableFuture<Void> future = new SelfRetryingListenableFuture<>(this, callable,
+ hConnectionParams, executorService);
+
+ return future.startFuture();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Void> batchMutateAsync(final List<Mutation> mutations) {
+ return executorService.submit(new Callable<Void>() {
+ public Void call() throws IOException {
+ batchMutate(mutations);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Void> flushCommitsAsync() {
+ return executorService.submit(new Callable<Void>() {
+ public Void call() throws IOException {
+ flushCommits();
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<RowLock> lockRowAsync(final byte[] row) {
+ ServerCallable<ListenableFuture<RowLock>> callable = new ServerCallable<ListenableFuture<RowLock>>(
+ getConnection(), getTableName(), row, getOptions()) {
+ @Override
+ public ListenableFuture<RowLock> call() throws Exception {
+ return ((HBaseToThriftAdapter)server).lockRowAsync(location.getRegionInfo().getRegionName(), row);
+ }
+ };
+
+ SelfRetryingListenableFuture<RowLock> future = new SelfRetryingListenableFuture<>(this, callable,
+ hConnectionParams, executorService);
+
+ return future.startFuture();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ListenableFuture<Void> unlockRowAsync(final RowLock rl) {
+ ServerCallable<ListenableFuture<Void>> callable = new ServerCallable<ListenableFuture<Void>>(
+ getConnection(), getTableName(), rl.getRow(), getOptions()) {
+ @Override
+ public ListenableFuture<Void> call() throws Exception {
+ return ((HBaseToThriftAdapter)server).unlockRowAsync(
+ location.getRegionInfo().getRegionName(), rl.getLockId());
+ }
+ };
+
+ SelfRetryingListenableFuture<Void> future = new SelfRetryingListenableFuture<>(this, callable,
+ hConnectionParams, executorService);
+
+ return future.startFuture();
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,135 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Used to communicate with a single HBase table.
+ * Provide additional asynchronous APIs as complement of HTableInterface.
+ * When APIs are called, it returns a ListenableFuture object immediately,
+ * and executes the request in the background. Call blocking method get()
+ * on the ListenableFuture object to get the result.
+ *
+ * We don't provide asynchronous API that does not directly contact region
+ * server, for example, put().
+ */
+public interface HTableAsyncInterface extends HTableInterface {
+
+ /**
+ * Extracts certain cells from a given row.
+ *
+ * @param get The object that specifies what data to fetch and from which row.
+ * @return Listenable future of data coming from the specified row, if it exists.
+ * If the row specified doesn't exist, the {@link Result} instance returned won't
+ * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
+ */
+ ListenableFuture<Result> getAsync(Get get);
+
+ /**
+ * Extracts certain cells from the given rows, in batch.
+ *
+ * @param list The objects that specify what data to fetch and from which rows.
+ * @return Listenable future of data coming from the specified rows, if it exists.
+ * If the row specified doesn't exist, the {@link Result} instance returned won't
+ * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
+ * If there are any failures even after retries, there will be a null in
+ * the results array for those Gets, AND an exception will be thrown.
+ */
+ ListenableFuture<Result[]> batchGetAsync(final List<Get> list);
+
+ /**
+ * Return the row that matches <i>row</i> exactly, or the one that immediately precedes it.
+ *
+ * @param row A row key.
+ * @param family Column family to include in the {@link Result}.
+ * @return Listenable future of the row that matches <i>row</i> exactly, or
+ * the one that immediately precedes it.
+ */
+ ListenableFuture<Result> getRowOrBeforeAsync(byte[] row, byte[] family);
+
+ /**
+ * Puts some data in the table.
+ * <p>
+ * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
+ * until the internal buffer is full.
+ *
+ * @param put The data to put.
+ * @return Listenable future.
+ */
+ ListenableFuture<Void> putAsync(Put put);
+
+ /**
+ * Deletes the specified cells/row.
+ *
+ * @param delete The object that specifies what to delete.
+ * @return Listenable future.
+ */
+ ListenableFuture<Void> deleteAsync(Delete delete);
+
+ /**
+ * Performs multiple mutations atomically on a single row. Currently
+ * {@link Put} and {@link Delete} are supported.
+ *
+ * @param arm object that specifies the set of mutations to perform
+ * @return Listenable future.
+ */
+ ListenableFuture<Void> mutateRowAsync(RowMutations arm);
+
+ /**
+ * Process batch of mutations on a row. Currently
+ * {@link Put} and {@link Delete} are supported.
+ *
+ * @param mutations objects that specify the set of mutations to perform.
+ * @return Listenable future.
+ */
+ ListenableFuture<Void> batchMutateAsync(List<Mutation> mutations);
+
+ /**
+ * Executes all the buffered {@link Put} operations.
+ * <p>
+ * This method gets called once automatically for every {@link Put} or batch
+ * of {@link Put}s (when <code>put(List<Put>)</code> is used) when
+ * {@link #isAutoFlush} is {@code true}.
+ *
+ * @return Listenable future.
+ */
+ ListenableFuture<Void> flushCommitsAsync();
+
+ /**
+ * Obtains a lock on a row.
+ *
+ * @param row The row to lock.
+ * @return Listenable future of a {@link RowLock} containing the row and lock id.
+ */
+ ListenableFuture<RowLock> lockRowAsync(byte[] row);
+
+ /**
+ * Releases a row lock.
+ *
+ * @param rl The row lock to release.
+ * @return Listenable future.
+ */
+ ListenableFuture<Void> unlockRowAsync(RowLock rl);
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,359 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all.
+ */
+public class HTableClientScanner implements ResultScanner, Runnable {
+ private static final Log LOG = LogFactory.getLog(HTableClientScanner.class);
+ // End of Scanning
+ private static final Result[] EOS = new Result[0];
+
+ private static final int MAX_THREADS_IN_POOL = Runtime.getRuntime()
+ .availableProcessors();
+
+ private static final ExecutorService executor = new ThreadPoolExecutor(1,
+ MAX_THREADS_IN_POOL, 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
+
+ // HEADSUP: The scan internal start row can change as we move through table.
+ protected final Scan scan;
+ // The number of prefetched and cached results
+ private final int caching;
+ // Temporary results list in main thread, may be null
+ private Result[] currentResults;
+ // The position of next unfetched results in currentResults if it is
+ // non-null.
+ private int currentPos;
+ // Whether this client has closed.
+ private boolean closed;
+ /**
+ * The queue transferring fetched Result[] to main thread.
+ * When queue.take() returns an EOS, scanning ends.
+ */
+ private final ArrayBlockingQueue<Result[]> queue;
+ // The variable informing fetching thread to stop
+ private volatile boolean closing;
+ // Contains the exception caught in fetch thread.
+ private volatile Throwable exception;
+
+ private final HTable table;
+
+ /**
+ * Constructor.
+ */
+ public HTableClientScanner(Scan scan, HTable table) {
+ this.scan = scan;
+ this.table = table;
+ this.queue = new ArrayBlockingQueue<>(table.getConfiguration().getInt(
+ HConstants.HBASE_CLIENT_SCANNER_QUEUE_LENGTH,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH));
+
+ if (scan.getCaching() > 0) {
+ this.caching = scan.getCaching();
+ } else {
+ this.caching = table.getScannerCaching();
+ }
+ }
+
+ HTableClientScanner initialize() {
+ executor.execute(this);
+ return this;
+ }
+
+ @Override
+ public Iterator<Result> iterator() {
+ return new ResultScannerIterator(this);
+ }
+
+ // Throws a Throwable exception as IOException of RuntimeException
+ private void throwIOException(Throwable e) throws IOException {
+ if (e != null) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Fetches results from queue to currentResults if it is not null.
+ *
+ * @return true if more results available, false if end of scanning
+ */
+ private boolean fetchFromQueue() throws IOException {
+ if (currentResults != null) {
+ return true;
+ }
+
+ if (closed) {
+ return false;
+ }
+
+ try {
+ currentResults = queue.take();
+ if (currentResults.length == 0) {
+ // End of scanning
+ closed = true;
+ currentResults = null;
+
+ if (exception != null) {
+
+ // Failure of scanning
+ throwIOException(exception);
+ }
+
+ return false;
+ }
+
+ // Results fetched
+ currentPos = 0;
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Result next() throws IOException {
+ if (!fetchFromQueue()) {
+ return null;
+ }
+ Result res = currentResults[currentPos];
+ currentPos++;
+
+ if (currentPos >= currentResults.length) {
+ currentResults = null;
+ }
+ return res;
+ }
+
+ @Override
+ public Result[] next(int nbRows) throws IOException {
+ if (!fetchFromQueue()) {
+ return null;
+ }
+
+ // In case, currentResults is just the results we want, return it directly
+ // to avoid extra resource allocation and copying.
+ if (currentPos == 0 && nbRows == currentResults.length) {
+ Result[] res = currentResults;
+ currentResults = null;
+ return res;
+ }
+
+ Result[] res = new Result[nbRows];
+ int len = 0;
+
+ while (len < nbRows) {
+ // Move from currentResults
+ int n = Math.min(nbRows - len, currentResults.length - currentPos);
+ System.arraycopy(currentResults, currentPos, res, len, n);
+
+ len += n;
+ currentPos += n;
+
+ if (currentPos == currentResults.length) {
+ currentResults = null;
+
+ if (!fetchFromQueue()) {
+ // Unexpected partial results, we have to make a copy.
+ return Arrays.copyOf(res, len);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ @Override
+ public void close() {
+ if (this.closed) {
+ return;
+ }
+ this.closing = true;
+ try {
+ while (fetchFromQueue()) {
+ // skip all results
+ currentResults = null;
+ }
+ } catch (Throwable e) {
+ LOG.debug("Exception on closing", e);
+ this.closed = true;
+ }
+ }
+
+ private Result[] call(ScannerCallable callable) throws IOException {
+ return table.getConnectionAndResetOperationContext()
+ .getRegionServerWithRetries(callable);
+ }
+
+ // Returns a ScannerCallable with a start key
+ private ScannerCallable getScannerCallable(byte[] startKey) {
+ scan.setStartRow(startKey);
+ ScannerCallable s = new ScannerCallable(
+ table.getConnectionAndResetOperationContext(), table.getTableName(),
+ scan, table.getOptions());
+ s.setCaching(caching);
+ return s;
+ }
+
+ // Closes a callable silently.
+ private void closeScanner(ScannerCallable callable) {
+ callable.setClose();
+ try {
+ call(callable);
+ } catch (IOException e) {
+ // We used to catch this error, interpret, and rethrow. However, we
+ // have since decided that it's not nice for a scanner's close to
+ // throw exceptions. Chances are it was just an UnknownScanner
+ // exception due to lease time out.
+ LOG.error("Exception caught during closeScanner", e);
+ }
+ }
+
+ /**
+ * Scans a region server, results are put to queue.
+ *
+ * @return New start key if scanning does not end, null otherwise
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private byte[] scanRegionServer(byte[] startKey) throws IOException,
+ InterruptedException {
+ // Open a scanner
+ ScannerCallable callable = getScannerCallable(startKey);
+ // openScanner
+ call(callable);
+ HRegionInfo currentRegion = callable.getHRegionInfo();
+
+ Result lastRes = null;
+ long lastSuccNextTs = System.currentTimeMillis();
+ try {
+ while (!closing) {
+ Result[] values = call(callable);
+ if (values == null) {
+ // End of scanning
+ return null;
+ } else if (values.length == 0) {
+ // End of region
+ return currentRegion.getEndKey();
+ }
+
+ lastRes = values[values.length - 1];
+ if (!closing) {
+ queue.put(values);
+ }
+ lastSuccNextTs = System.currentTimeMillis();
+ }
+ } catch (DoNotRetryIOException e) {
+ boolean canRetry = false;
+ if (e instanceof UnknownScannerException) {
+ long timeoutTs = lastSuccNextTs + table.scannerTimeout;
+ long now = System.currentTimeMillis();
+ if (now > timeoutTs) {
+ // Scanner timeout
+ long elapsed = now - lastSuccNextTs;
+ ScannerTimeoutException ex = new ScannerTimeoutException(elapsed
+ + "ms pased since the last invocation, "
+ + "timetout is current set to " + table.scannerTimeout);
+ ex.initCause(e);
+ throw ex;
+ }
+
+ canRetry = true; // scannerTimeout
+ } else {
+ Throwable cause = e.getCause();
+ if (cause != null && cause instanceof NotServingRegionException) {
+ canRetry = true;
+ }
+ }
+
+ if (!canRetry) {
+ // Cannot retry, simply throw it out
+ throw e;
+ }
+
+ if (lastRes != null) {
+ return Bytes.nextOf(lastRes.getRow());
+ }
+
+ return startKey;
+ } finally {
+ closeScanner(callable);
+ }
+ // Only reach here when closing is true
+ return null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ byte[] startKey = this.scan.getStartRow();
+ while (!closing) {
+ startKey = scanRegionServer(startKey);
+ if (startKey == null || startKey.length == 0) {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ exception = e;
+ } catch (Throwable e) {
+ exception = e;
+ }
+
+ try {
+ queue.put(EOS);
+ } catch (InterruptedException e) {
+ LOG.info("Fetching thread interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Mar 12 21:17:13 2014
@@ -278,7 +278,7 @@ public interface HTableInterface {
public void mutateRow(final RowMutations arm) throws IOException;
public void mutateRow(List<RowMutations> armList) throws IOException;
-
+
/**
* Tells whether or not 'auto-flush' is turned on.
*
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,190 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
+
+/**
+ * This is used in the containers in {@link TMultiResponse}. Previously we used
+ * Object as value, but since we are moving to Thrift serialization, we need to
+ * serialize the containers. The Type specifies which value this wrapper is
+ * holding: Integer, List<Result> or MultiResponseException. Previously Object
+ * could represent either Integer, Exception or Result[].
+ *
+ */
+@ThriftStruct
+public class IntegerOrResultOrException {
+
+ public enum Type {
+ INTEGER, LIST_OF_RESULTS, EXCEPTION
+ }
+
+ private Integer integer;
+ private ThriftHBaseException ex = null;
+ private List<Result> results = null;
+ private Type type = null;
+
+ public IntegerOrResultOrException() {}
+
+ @ThriftConstructor
+ public IntegerOrResultOrException(@ThriftField(1) Integer integer,
+ @ThriftField(2) ThriftHBaseException ex,
+ @ThriftField(3) List<Result> results,
+ @ThriftField(4) Type type) {
+ this.type = type;
+ if (type == Type.INTEGER) {
+ this.integer = integer;
+ } else if (type == Type.LIST_OF_RESULTS) {
+ this.results = results;
+ } else if (type == Type.EXCEPTION) {
+ this.ex = ex;
+ }
+ }
+
+ public IntegerOrResultOrException(Object obj) {
+ if (obj instanceof Integer) {
+ this.integer = (Integer) obj;
+ this.type = Type.INTEGER;
+ } else if (obj instanceof Exception) {
+ this.ex = new ThriftHBaseException((Exception) obj);
+ this.type = Type.EXCEPTION;
+ } else if (obj instanceof Result[]) {
+ Result[] resultArray = (Result[]) obj;
+ this.results = new ArrayList<>();
+ Collections.addAll(results, resultArray);
+ this.type = Type.LIST_OF_RESULTS;
+ } else {
+ throw new IllegalArgumentException(obj.getClass().getCanonicalName()
+ + " is not a supported type");
+ }
+ }
+
+ public IntegerOrResultOrException(Integer integer) {
+ this.integer = integer;
+ this.type = Type.INTEGER;
+ }
+
+ public IntegerOrResultOrException(ThriftHBaseException ex) {
+ this.ex = ex;
+ this.type = Type.EXCEPTION;
+ }
+
+ public IntegerOrResultOrException(List<Result> results) {
+ this.results = results;
+ this.type = Type.LIST_OF_RESULTS;
+ }
+
+ @ThriftField(1)
+ public Integer getInteger() {
+ return integer;
+ }
+
+ @ThriftField(2)
+ public ThriftHBaseException getEx() {
+ return ex;
+ }
+
+ @ThriftField(3)
+ public List<Result> getResults() {
+ return results;
+ }
+
+ @ThriftField(4)
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(integer, results, ex, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ IntegerOrResultOrException other = (IntegerOrResultOrException) obj;
+ if (ex == null) {
+ if (other.ex != null)
+ return false;
+ } else if (!ex.equals(other.ex))
+ return false;
+ if (integer == null) {
+ if (other.integer != null)
+ return false;
+ } else if (!integer.equals(other.integer))
+ return false;
+ if (results == null) {
+ if (other.results != null)
+ return false;
+ } else if (!results.equals(other.results))
+ return false;
+ if (type != other.type)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "IntegerOrResultOrException [integer=" + integer + ", ex=" + ex
+ + ", results=" + results + ", type=" + type + "]";
+ }
+
+ /**
+ * Create IntegerOrResultOrException from Object
+ * @param obj
+ * @return
+ */
+ public static IntegerOrResultOrException createFromObject(Object obj) {
+ return new IntegerOrResultOrException(obj);
+ }
+
+ /**
+ * This is used when transforming TMultiResponse to MultiResponse
+ *
+ * @param ioroe
+ */
+ public static Object createObjectFromIntegerOrResultOrException(
+ IntegerOrResultOrException ioroe) {
+ if (ioroe.getType().equals(Type.INTEGER)) {
+ return ioroe.getInteger();
+ } else if (ioroe.getType().equals(Type.LIST_OF_RESULTS)) {
+ List<Result> list = ioroe.getResults();
+ return list.toArray(new Result[list.size()]);
+ } else if (ioroe.getType().equals(Type.EXCEPTION)) {
+ return ioroe.getEx().getServerJavaException();
+ }
+ return null;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Wed Mar 12 21:17:13 2014
@@ -34,7 +34,7 @@ import java.io.IOException;
* Scanner class that contains the <code>.META.</code> table scanning logic
* and uses a Retryable scanner. Provided visitors will be called
* for each row.
- *
+ *
* Although public visibility, this is not a public-facing API and may evolve in
* minor releases.
*/
@@ -121,8 +121,7 @@ public class MetaScanner {
byte[] searchRow =
HRegionInfo.createRegionName(tableName, row, HConstants.NINES,
false);
-
- HTable metaTable = new HTable(configuration, metaTableName);
+ HTable metaTable = new HTable(configuration , metaTableName);
Result startRowResult = metaTable.getRowOrBefore(searchRow,
HConstants.CATALOG_FAMILY);
if (startRowResult == null) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Wed Mar 12 21:17:13 2014
@@ -19,36 +19,85 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-
+import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.DataInput;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
+
/**
* Container for Actions (i.e. Get, Delete, or Put), which are grouped by
* regionName. Intended to be used with HConnectionManager.processBatch()
*/
+@ThriftStruct
public final class MultiAction implements Writable {
private static final int VERSION_0 = 0;
// map of regions to lists of puts/gets/deletes for that region.
- public Map<byte[], List<Get>> gets = null;
- public Map<byte[], List<Put>> puts = null;
- public Map<byte[], List<Delete>> deletes = null;
- public Map<byte[], List<Integer>> originalIndex = null;
+ private Map<byte[], List<Get>> gets = null;
+ private Map<byte[], List<Put>> puts = null;
+ private Map<byte[], List<Delete>> deletes = null;
+ private Map<byte[], List<Integer>> originalIndex = null;
public MultiAction() {
}
/**
+ * Thrift constructor for MultiAction serialization
+ * @param gets
+ * @param puts
+ * @param deletes
+ * @param originalIndex
+ */
+ @ThriftConstructor
+ public MultiAction(@ThriftField(1) Map<byte[], List<Get>> gets,
+ @ThriftField(2) Map<byte[], List<Put>> puts,
+ @ThriftField(3) Map<byte[], List<Delete>> deletes) {
+ if (gets != null) {
+ this.gets = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ this.gets.putAll(gets);
+ }
+ if (puts != null) {
+ this.puts = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ this.puts.putAll(puts);
+ }
+ if (deletes != null) {
+ this.deletes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ this.deletes.putAll(deletes);
+ }
+ }
+
+ @ThriftField(1)
+ public Map<byte[], List<Get>> getGets() {
+ return gets;
+ }
+
+ @ThriftField(2)
+ public Map<byte[], List<Put>> getPuts() {
+ return puts;
+ }
+
+ @ThriftField(3)
+ public Map<byte[], List<Delete>> getDeletes() {
+ return deletes;
+ }
+
+ public Map<byte[], List<Integer>> getOriginalIndex() {
+ return originalIndex;
+ }
+
+ /**
* Add an Action to this container based on it's regionName. If the regionName
* is wrong, the initial execution will fail, but will be automatically
* retried after looking up the correct region.
@@ -122,7 +171,6 @@ public final class MultiAction implement
deletes = readMap(in);
}
- @SuppressWarnings("unchecked")
private <R extends Row> Map<byte[], List<R>> readMap(DataInput in) throws IOException {
int mapSize = in.readInt();
@@ -134,11 +182,51 @@ public final class MultiAction implement
int listSize = in.readInt();
List<R> lst = new ArrayList<R>(listSize);
for (int j = 0; j < listSize; j++) {
- lst.add((R) HbaseObjectWritable.readObject(in, null));
+ @SuppressWarnings("unchecked")
+ R elem = (R) HbaseObjectWritable.readObject(in, null);
+ lst.add(elem);
}
map.put(key, lst);
}
return map;
}
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(deletes, puts, gets);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MultiAction other = (MultiAction) obj;
+ if (deletes == null) {
+ if (other.deletes != null)
+ return false;
+ } else if (!deletes.equals(other.deletes))
+ return false;
+ if (gets == null) {
+ if (other.gets != null)
+ return false;
+ } else if (!gets.equals(other.gets))
+ return false;
+ if (puts == null) {
+ if (other.puts != null)
+ return false;
+ } else if (!puts.equals(other.puts))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "MultiAction [gets=" + gets + ", puts=" + puts + ", deletes="
+ + deletes + "]";
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java Wed Mar 12 21:17:13 2014
@@ -33,14 +33,19 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/**
* Data type class for putting multiple regions worth of puts in one RPC.
*/
+@ThriftStruct
public class MultiPut extends Operation implements Writable {
public HServerAddress address; // client code ONLY
@@ -64,6 +69,16 @@ public class MultiPut extends Operation
address = a;
}
+ @ThriftConstructor
+ public MultiPut(@ThriftField(1) final Map<byte[], List<Put>> putsSerial) {
+ this.puts.putAll(putsSerial);
+ }
+
+ @ThriftField(1)
+ public Map<byte[], List<Put>> getPuts() {
+ return puts;
+ }
+
public int size() {
int size = 0;
for( List<Put> l : puts.values()) {
@@ -103,17 +118,17 @@ public class MultiPut extends Operation
}
/**
- * Compile the table and column family (i.e. schema) information
- * into a String. Useful for parsing and aggregation by debugging,
+ * Compile the table and column family (i.e. schema) information
+ * into a String. Useful for parsing and aggregation by debugging,
* logging, and administration tools.
* @return Map
*/
@Override
public Map<String, Object> getFingerprint() {
Map<String, Object> map = new HashMap<String, Object>();
- // for extensibility, we have a map of table information that we will
+ // for extensibility, we have a map of table information that we will
// populate with only family information for each table
- Map<String, Map> tableInfo =
+ Map<String, Map> tableInfo =
new HashMap<String, Map>();
map.put("tables", tableInfo);
for (Map.Entry<byte[], List<Put>> entry : puts.entrySet()) {
@@ -121,8 +136,8 @@ public class MultiPut extends Operation
// not how many Puts touch them, so we use this Set to do just that.
Set<String> familySet;
try {
- // since the puts are stored by region, we may have already
- // recorded families for this region. if that is the case,
+ // since the puts are stored by region, we may have already
+ // recorded families for this region. if that is the case,
// we want to add to the existing Set. if not, we make a new Set.
String tableName = Bytes.toStringBinary(
HRegionInfo.parseRegionName(entry.getKey())[0]);
@@ -141,7 +156,7 @@ public class MultiPut extends Operation
table.put("families", familySet);
tableInfo.put(Bytes.toStringBinary(entry.getKey()), table);
}
- // we now iterate through each Put and keep track of which families
+ // we now iterate through each Put and keep track of which families
// are affected in this table.
for (Put p : entry.getValue()) {
for (byte[] fam : p.getFamilyMap().keySet()) {
@@ -153,8 +168,8 @@ public class MultiPut extends Operation
}
/**
- * Compile the details beyond the scope of getFingerprint (mostly
- * toMap from the Puts) into a Map along with the fingerprinted
+ * Compile the details beyond the scope of getFingerprint (mostly
+ * toMap from the Puts) into a Map along with the fingerprinted
* information. Useful for debugging, logging, and administration tools.
* @param maxCols a limit on the number of columns output prior to truncation
* @return Map
@@ -171,9 +186,9 @@ public class MultiPut extends Operation
continue;
}
List<Put> regionPuts = entry.getValue();
- List<Map<String, Object>> putSummaries =
+ List<Map<String, Object>> putSummaries =
new ArrayList<Map<String, Object>>();
- // find out how many of this region's puts we can add without busting
+ // find out how many of this region's puts we can add without busting
// the maximum
int regionPutsToAdd = regionPuts.size();
putCount += regionPutsToAdd;
@@ -192,11 +207,11 @@ public class MultiPut extends Operation
// in the case of parse error, default to labeling by region
tableName = Bytes.toStringBinary(entry.getKey());
}
- // since the puts are stored by region, we may have already
- // recorded puts for this table. if that is the case,
- // we want to add to the existing List. if not, we place a new list
+ // since the puts are stored by region, we may have already
+ // recorded puts for this table. if that is the case,
+ // we want to add to the existing List. if not, we place a new list
// in the map
- Map<String, Object> table =
+ Map<String, Object> table =
(Map<String, Object>) tableInfo.get(tableName);
if (table == null) {
// in case the Put has changed since getFingerprint's map was built
@@ -246,4 +261,22 @@ public class MultiPut extends Operation
puts.put(key, ps);
}
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MultiPut other = (MultiPut) obj;
+ return (this.getPuts().size() == other.getPuts().size()) &&
+ (this.getPuts().entrySet().containsAll(other.getPuts().entrySet()));
+ }
+
+ @Override
+ public int hashCode() {
+ return puts.hashCode();
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java Wed Mar 12 21:17:13 2014
@@ -20,6 +20,9 @@
package org.apache.hadoop.hbase.client;
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -32,6 +35,7 @@ import java.util.TreeMap;
/**
* Response class for MultiPut.
*/
+@ThriftStruct
public class MultiPutResponse implements Writable {
protected MultiPut request; // used in client code ONLY
@@ -40,6 +44,20 @@ public class MultiPutResponse implements
public MultiPutResponse() {}
+ @ThriftConstructor
+ public MultiPutResponse(@ThriftField(1) final Map<byte[], Integer> answers) {
+ // Adding it to the existing TreeMap, because we want to use the
+ // BYTES_COMPARATOR.
+ for (Map.Entry<byte[], Integer> e : answers.entrySet()) {
+ this.answers.put(e.getKey(), e.getValue());
+ }
+ }
+
+ @ThriftField(1)
+ public Map<byte[], Integer> getAnswers() {
+ return answers;
+ }
+
public void addResult(byte[] regionName, int result) {
answers.put(regionName, result);
}
@@ -69,4 +87,30 @@ public class MultiPutResponse implements
answers.put(key, value);
}
}
+
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MultiPutResponse other = (MultiPutResponse)obj;
+ if ((other.answers == null) != (this.answers == null)) {
+ return false;
+ }
+ if (this.answers != null) {
+ // If the answers map is not null, they should be of the same size, and
+ // have the same entries.
+ if (!((this.answers.size() == other.answers.size()) &&
+ this.answers.entrySet().containsAll(other.answers.entrySet()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int hashCode() {
+ return answers.hashCode();
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Wed Mar 12 21:17:13 2014
@@ -20,25 +20,17 @@
package org.apache.hadoop.hbase.client;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.StringUtils;
-
+import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.DataInput;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.List;
import java.util.Map;
-import java.util.ArrayList;
+import java.util.Map.Entry;
import java.util.TreeMap;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
/**
* A container for Result objects, grouped by regionName.
*/
@@ -54,15 +46,38 @@ public class MultiResponse implements Wr
public MultiResponse() {
}
+ public MultiResponse(Map<byte[], Object> resultsForGet,
+ Map<byte[], Object> resultsForPut,
+ Map<byte[], Object> resultsForDelete) {
+ // TODO @gauravm: Change from copy to direct assignment.
+ if (resultsForGet != null) {
+ this.resultsForGet = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ this.resultsForGet.putAll(resultsForGet);
+ } else {
+ this.resultsForGet = null;
+ }
+
+ if (resultsForPut != null) {
+ this.resultsForPut = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ this.resultsForPut.putAll(resultsForPut);
+ } else {
+ this.resultsForPut = null;
+ }
+
+ if (resultsForDelete != null) {
+ this.resultsForDelete = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ this.resultsForDelete.putAll(resultsForDelete);
+ } else {
+ this.resultsForDelete = null;
+ }
+ }
/**
* Add the pair to the container, grouped by the regionName
*
- * @param regionName
- * @param r
- * First item in the pair is the original index of the Action
- * (request). Second item is the Result. Result will be empty for
- * successful Put and Delete actions.
+ * @param regionName - name of the region
+ * @param rs - can be Integer, Result[] or Exception - if the
+ * call was not successful.
*/
public void addGetResponse(byte[] regionName, Object rs) {
if (resultsForGet == null)
@@ -98,12 +113,12 @@ public class MultiResponse implements Wr
return ((Integer)result).intValue();
}
- public Result[] getGetResult(byte []regionName) throws Exception {
+ public Result[] getGetResult(byte[] regionName) throws Exception {
Object result = resultsForGet.get(regionName);
if (result instanceof Exception) // false if result == null
- throw (Exception)result;
+ throw (Exception) result;
- return ((Result[])result);
+ return ((Result[]) result);
}
@Override
@@ -153,4 +168,91 @@ public class MultiResponse implements Wr
return map;
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((resultsForDelete == null) ? 0 : resultsForDelete.hashCode());
+ result = prime * result
+ + ((resultsForGet == null) ? 0 : resultsForGet.hashCode());
+ result = prime * result
+ + ((resultsForPut == null) ? 0 : resultsForPut.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MultiResponse other = (MultiResponse) obj;
+ if (resultsForDelete == null) {
+ if (other.resultsForDelete != null)
+ return false;
+ } else if (!resultsForDelete.equals(other.resultsForDelete))
+ return false;
+ if (resultsForGet == null) {
+ if (other.resultsForGet != null)
+ return false;
+ } else if (!resultsForGet.equals(other.resultsForGet))
+ return false;
+ if (resultsForPut == null) {
+ if (other.resultsForPut != null)
+ return false;
+ } else if (!resultsForPut.equals(other.resultsForPut))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "MultiResponse [resultsForGet=" + resultsForGet + ", resultsForPut="
+ + resultsForPut + ", resultsForDelete=" + resultsForDelete + "]";
+ }
+
+ public Map<byte[], Object> getResultsForGet() {
+ return resultsForGet;
+ }
+
+ public Map<byte[], Object> getResultsForPut() {
+ return resultsForPut;
+ }
+
+ public Map<byte[], Object> getResultsForDelete() {
+ return resultsForDelete;
+ }
+
+ public static class Builder {
+
+ public static MultiResponse createFromTMultiResponse(
+ TMultiResponse tMultiResponse) {
+ Map<byte[], IntegerOrResultOrException> resultsForGet = tMultiResponse
+ .getResultsForGet();
+ Map<byte[], IntegerOrResultOrException> resutsForPut = tMultiResponse
+ .getResultsForPut();
+ Map<byte[], IntegerOrResultOrException> resultsForDelete = tMultiResponse
+ .getResultsForDelete();
+ return new MultiResponse(transformTmultiResponseMap(resultsForGet),
+ transformTmultiResponseMap(resutsForPut),
+ transformTmultiResponseMap(resultsForDelete));
+ }
+
+ public static Map<byte[], Object> transformTmultiResponseMap(
+ Map<byte[], IntegerOrResultOrException> map) {
+ if (map == null) {
+ return null;
+ }
+ Map<byte[], Object> resultMap = new TreeMap<byte[], Object>(
+ Bytes.BYTES_COMPARATOR);
+ for (Entry<byte[], IntegerOrResultOrException> entry : map.entrySet()) {
+ resultMap.put(entry.getKey(), IntegerOrResultOrException
+ .createObjectFromIntegerOrResultOrException(entry.getValue()));
+ }
+ return resultMap;
+ }
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Mar 12 21:17:13 2014
@@ -25,16 +25,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.UUID;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
public abstract class Mutation extends OperationWithAttributes implements Row {
- // Attribute used in Mutations to indicate the originating cluster.
private static final String CLUSTER_ID_ATTR = "_c.id_";
-
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected long lockId = -1L;
@@ -170,7 +167,7 @@ public abstract class Mutation extends O
* @return The lock ID.
*/
public long getLockId() {
- return this.lockId;
+ return this.lockId;
}
/**
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java Wed Mar 12 21:17:13 2014
@@ -34,14 +34,14 @@ public abstract class Operation {
private static final int DEFAULT_MAX_COLS = 5;
/**
- * Produces a Map containing a fingerprint which identifies the type and
+ * Produces a Map containing a fingerprint which identifies the type and
* the static schema components of a query (i.e. column families)
* @return a map containing fingerprint information (i.e. column families)
*/
public abstract Map<String, Object> getFingerprint();
/**
- * Produces a Map containing a summary of the details of a query
+ * Produces a Map containing a summary of the details of a query
* beyond the scope of the fingerprint (i.e. columns, rows...)
* @param maxCols a limit on the number of columns output prior to truncation
* @return a map containing parameters of a query (i.e. rows, columns...)
@@ -57,7 +57,7 @@ public abstract class Operation {
}
/**
- * Produces a JSON object for fingerprint and details exposure in a
+ * Produces a JSON object for fingerprint and details exposure in a
* parseable format.
* @param maxCols a limit on the number of columns to include in the JSON
* @return a JSONObject containing this Operation's information, as a string
@@ -68,7 +68,7 @@ public abstract class Operation {
}
/**
- * Produces a JSON object sufficient for description of a query
+ * Produces a JSON object sufficient for description of a query
* in a debugging or logging context.
* @return the produced JSON object, as a string
*/
@@ -77,16 +77,16 @@ public abstract class Operation {
}
/**
- * Produces a string representation of this Operation. It defaults to a JSON
- * representation, but falls back to a string representation of the
+ * Produces a string representation of this Operation. It defaults to a JSON
+ * representation, but falls back to a string representation of the
* fingerprint and details in the case of a JSON encoding failure.
- * @param maxCols a limit on the number of columns output in the summary
+ * @param maxCols a limit on the number of columns output in the summary
* prior to truncation
* @return a JSON-parseable String
*/
public String toString(int maxCols) {
- /* for now this is merely a wrapper from producing a JSON string, but
- * toJSON is kept separate in case this is changed to be a less parsable
+ /* for now this is merely a wrapper from producing a JSON string, but
+ * toJSON is kept separate in case this is changed to be a less parsable
* pretty printed representation.
*/
try {
@@ -97,8 +97,8 @@ public abstract class Operation {
}
/**
- * Produces a string representation of this Operation. It defaults to a JSON
- * representation, but falls back to a string representation of the
+ * Produces a string representation of this Operation. It defaults to a JSON
+ * representation, but falls back to a string representation of the
* fingerprint and details in the case of a JSON encoding failure.
* @return String
*/
@@ -107,3 +107,4 @@ public abstract class Operation {
return toString(DEFAULT_MAX_COLS);
}
}
+