You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/26 18:19:32 UTC
[2/2] accumulo git commit: ACCUMULO-3795 add scanner batch timeout
ACCUMULO-3795 add scanner batch timeout
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/81994647
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/81994647
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/81994647
Branch: refs/heads/master
Commit: 819946477c7e9f6604332c3828021fe8db52cddb
Parents: d65f514
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Jun 26 12:19:23 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Jun 26 12:19:23 2015 -0400
----------------------------------------------------------------------
.../core/client/ClientSideIteratorScanner.java | 2 +
.../accumulo/core/client/IsolatedScanner.java | 1 +
.../accumulo/core/client/ScannerBase.java | 22 ++
.../core/client/impl/ScannerOptions.java | 20 ++
.../impl/TabletServerBatchReaderIterator.java | 2 +-
.../core/client/impl/ThriftScanner.java | 18 +-
.../core/metadata/MetadataLocationObtainer.java | 4 +-
.../thrift/TabletClientService.java | 252 ++++++++++++++++---
core/src/main/thrift/tabletserver.thrift | 8 +-
.../client/impl/TableOperationsImplTest.java | 1 +
.../server/util/VerifyTabletAssignments.java | 2 +-
.../monitor/servlets/trace/NullScanner.java | 11 +
.../apache/accumulo/tserver/TabletServer.java | 15 +-
.../accumulo/tserver/scan/LookupTask.java | 2 +-
.../tserver/session/MultiScanSession.java | 4 +-
.../accumulo/tserver/session/ScanSession.java | 4 +-
.../accumulo/tserver/tablet/ScanDataSource.java | 4 +-
.../accumulo/tserver/tablet/ScanOptions.java | 8 +-
.../apache/accumulo/tserver/tablet/Scanner.java | 2 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 36 ++-
.../accumulo/test/ScanFlushWithTimeIT.java | 107 ++++++++
.../test/performance/thrift/NullTserver.java | 4 +-
22 files changed, 461 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index f077573..10931f5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -155,6 +155,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
this.range = scanner.getRange();
this.size = scanner.getBatchSize();
this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
+ this.batchTimeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
this.readaheadThreshold = scanner.getReadaheadThreshold();
}
@@ -169,6 +170,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
public Iterator<Entry<Key,Value>> iterator() {
smi.scanner.setBatchSize(size);
smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
+ smi.scanner.setBatchTimeout(batchTimeOut, TimeUnit.MILLISECONDS);
smi.scanner.setReadaheadThreshold(readaheadThreshold);
if (isolated)
smi.scanner.enableIsolation();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index e530100..2e9f1d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@ -226,6 +226,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
this.scanner = scanner;
this.range = scanner.getRange();
this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
+ this.batchTimeOut = scanner.getBatchTimeout(TimeUnit.MILLISECONDS);
this.batchSize = scanner.getBatchSize();
this.readaheadThreshold = scanner.getReadaheadThreshold();
this.bufferFactory = bufferFactory;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 92ab551..d33df03 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -154,4 +154,26 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>> {
* @return The authorizations set on the scanner instance
*/
Authorizations getAuthorizations();
+
+ /**
+ * This setting determines how long a scanner will wait to fill the returned batch. By default, a scanner wait until the batch is full.
+ *
+ * <p>
+ * Setting the timeout to zero (with any time unit) or {@link Long#MAX_VALUE} (with {@link TimeUnit#MILLISECONDS}) means no timeout.
+ *
+ * @param timeOut
+ * the length of the timeout
+ * @param timeUnit
+ * the units of the timeout
+ * @since 1.8.0
+ */
+ void setBatchTimeout(long timeout, TimeUnit milliseconds);
+
+ /**
+ * Returns the timeout to fill a batch in the given TimeUnit.
+ *
+ * @return the batch timeout configured for this scanner
+ * @since 1.8.0
+ */
+ long getBatchTimeout(TimeUnit timeUnit);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
index e455d5a..cc337dd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
@@ -49,6 +49,8 @@ public class ScannerOptions implements ScannerBase {
protected long timeOut = Long.MAX_VALUE;
+ protected long batchTimeOut = Long.MAX_VALUE;
+
private String regexIterName = null;
protected ScannerOptions() {}
@@ -166,6 +168,7 @@ public class ScannerOptions implements ScannerBase {
Set<Entry<String,Map<String,String>>> es = src.serverSideIteratorOptions.entrySet();
for (Entry<String,Map<String,String>> entry : es)
dst.serverSideIteratorOptions.put(entry.getKey(), new HashMap<String,String>(entry.getValue()));
+ dst.batchTimeOut = src.batchTimeOut;
}
}
}
@@ -201,4 +204,21 @@ public class ScannerOptions implements ScannerBase {
public Authorizations getAuthorizations() {
throw new UnsupportedOperationException("No authorizations to return");
}
+
+ @Override
+ public void setBatchTimeout(long timeout, TimeUnit timeUnit) {
+ if (timeOut < 0) {
+ throw new IllegalArgumentException("Batch timeout must be positive : " + timeOut);
+ }
+ if (timeout == 0) {
+ this.batchTimeOut = Long.MAX_VALUE;
+ } else {
+ this.batchTimeOut = timeUnit.toMillis(timeout);
+ }
+ }
+
+ @Override
+ public long getBatchTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(batchTimeOut, TimeUnit.MILLISECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 053f2b3..f263581 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -635,7 +635,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
Translators.RT));
InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), context.rpcCreds(), thriftTabletRanges,
Translator.translate(columns, Translators.CT), options.serverSideIteratorList, options.serverSideIteratorOptions,
- ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites);
+ ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites, options.batchTimeOut);
if (waitForWrites)
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 39d3b32..2116cf2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -80,7 +80,7 @@ public class ThriftScanner {
public static boolean getBatchFromServer(ClientContext context, Range range, KeyExtent extent, String server, SortedMap<Key,Value> results,
SortedSet<Column> fetchedColumns, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, int size,
- Authorizations authorizations, boolean retry) throws AccumuloException, AccumuloSecurityException, NotServingTabletException {
+ Authorizations authorizations, boolean retry, long batchTimeOut) throws AccumuloException, AccumuloSecurityException, NotServingTabletException {
if (server == null)
throw new AccumuloException(new IOException());
@@ -91,13 +91,13 @@ public class ThriftScanner {
try {
// not reading whole rows (or stopping on row boundries) so there is no need to enable isolation below
ScanState scanState = new ScanState(context, extent.getTableId(), authorizations, range, fetchedColumns, size, serverSideIteratorList,
- serverSideIteratorOptions, false);
+ serverSideIteratorOptions, false, batchTimeOut);
TabletType ttype = TabletType.type(extent);
boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server);
InitialScan isr = client.startScan(tinfo, scanState.context.rpcCreds(), extent.toThrift(), scanState.range.toThrift(),
Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
- scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold);
+ scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold, scanState.batchTimeOut);
if (waitForWrites)
serversWaitedForWrites.get(ttype).add(server);
@@ -133,6 +133,7 @@ public class ThriftScanner {
Text startRow;
boolean skipStartRow;
long readaheadThreshold;
+ long batchTimeOut;
Range range;
@@ -152,13 +153,14 @@ public class ThriftScanner {
Map<String,Map<String,String>> serverSideIteratorOptions;
public ScanState(ClientContext context, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size,
- List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) {
+ List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long batchTimeOut) {
this(context, tableId, authorizations, range, fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, isolated,
- Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
+ Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD, batchTimeOut);
}
public ScanState(ClientContext context, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size,
- List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long readaheadThreshold) {
+ List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long readaheadThreshold,
+ long batchTimeOut) {
this.context = context;
;
this.authorizations = authorizations;
@@ -186,7 +188,7 @@ public class ThriftScanner {
this.isolated = isolated;
this.readaheadThreshold = readaheadThreshold;
-
+ this.batchTimeOut = batchTimeOut;
}
}
@@ -409,7 +411,7 @@ public class ThriftScanner {
boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location);
InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(), loc.tablet_extent.toThrift(), scanState.range.toThrift(),
Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
- scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold);
+ scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold, scanState.batchTimeOut);
if (waitForWrites)
serversWaitedForWrites.get(ttype).add(loc.tablet_location);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
index c8c61aa..0d294b8 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
@@ -93,7 +93,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer {
serverSideIteratorList.add(new IterInfo(10000, WholeRowIterator.class.getName(), "WRI"));
Map<String,Map<String,String>> serverSideIteratorOptions = Collections.emptyMap();
boolean more = ThriftScanner.getBatchFromServer(context, range, src.tablet_extent, src.tablet_location, encodedResults, locCols, serverSideIteratorList,
- serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false);
+ serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false, 0L);
decodeRows(encodedResults, results);
@@ -101,7 +101,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer {
range = new Range(results.lastKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME), true, new Key(stopRow).followingKey(PartialKey.ROW), false);
encodedResults.clear();
more = ThriftScanner.getBatchFromServer(context, range, src.tablet_extent, src.tablet_location, encodedResults, locCols, serverSideIteratorList,
- serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false);
+ serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false, 0L);
decodeRows(encodedResults, results);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
index 02bd4e1..bd0f79c 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
@@ -52,13 +52,13 @@ import org.slf4j.LoggerFactory;
public interface Iface extends org.apache.accumulo.core.client.impl.thrift.ClientService.Iface {
- public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
+ public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
public org.apache.accumulo.core.data.thrift.ScanResult continueScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
public void closeScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID) throws org.apache.thrift.TException;
- public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException;
+ public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException;
public org.apache.accumulo.core.data.thrift.MultiScanResult continueMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID) throws NoSuchScanIDException, org.apache.thrift.TException;
@@ -118,13 +118,13 @@ import org.slf4j.LoggerFactory;
public interface AsyncIface extends org.apache.accumulo.core.client.impl.thrift.ClientService .AsyncIface {
- public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
public void continueScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
public void closeScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
- public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
public void continueMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
@@ -202,13 +202,13 @@ import org.slf4j.LoggerFactory;
super(iprot, oprot);
}
- public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
+ public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
{
- send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold);
+ send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, batchTimeOut);
return recv_startScan();
}
- public void send_startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.thrift.TException
+ public void send_startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut) throws org.apache.thrift.TException
{
startScan_args args = new startScan_args();
args.setTinfo(tinfo);
@@ -223,6 +223,7 @@ import org.slf4j.LoggerFactory;
args.setWaitForWrites(waitForWrites);
args.setIsolated(isolated);
args.setReadaheadThreshold(readaheadThreshold);
+ args.setBatchTimeOut(batchTimeOut);
sendBase("startScan", args);
}
@@ -291,13 +292,13 @@ import org.slf4j.LoggerFactory;
sendBase("closeScan", args);
}
- public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException
+ public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException
{
- send_startMultiScan(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites);
+ send_startMultiScan(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites, batchTimeOut);
return recv_startMultiScan();
}
- public void send_startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws org.apache.thrift.TException
+ public void send_startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) throws org.apache.thrift.TException
{
startMultiScan_args args = new startMultiScan_args();
args.setTinfo(tinfo);
@@ -308,6 +309,7 @@ import org.slf4j.LoggerFactory;
args.setSsio(ssio);
args.setAuthorizations(authorizations);
args.setWaitForWrites(waitForWrites);
+ args.setBatchTimeOut(batchTimeOut);
sendBase("startMultiScan", args);
}
@@ -956,9 +958,9 @@ import org.slf4j.LoggerFactory;
super(protocolFactory, clientManager, transport);
}
- public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, resultHandler, this, ___protocolFactory, ___transport);
+ startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, batchTimeOut, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -976,7 +978,8 @@ import org.slf4j.LoggerFactory;
private boolean waitForWrites;
private boolean isolated;
private long readaheadThreshold;
- public startScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private long batchTimeOut;
+ public startScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.tinfo = tinfo;
this.credentials = credentials;
@@ -990,6 +993,7 @@ import org.slf4j.LoggerFactory;
this.waitForWrites = waitForWrites;
this.isolated = isolated;
this.readaheadThreshold = readaheadThreshold;
+ this.batchTimeOut = batchTimeOut;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -1007,6 +1011,7 @@ import org.slf4j.LoggerFactory;
args.setWaitForWrites(waitForWrites);
args.setIsolated(isolated);
args.setReadaheadThreshold(readaheadThreshold);
+ args.setBatchTimeOut(batchTimeOut);
args.write(prot);
prot.writeMessageEnd();
}
@@ -1090,9 +1095,9 @@ import org.slf4j.LoggerFactory;
}
}
- public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- startMultiScan_call method_call = new startMultiScan_call(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites, resultHandler, this, ___protocolFactory, ___transport);
+ startMultiScan_call method_call = new startMultiScan_call(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites, batchTimeOut, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -1106,7 +1111,8 @@ import org.slf4j.LoggerFactory;
private Map<String,Map<String,String>> ssio;
private List<ByteBuffer> authorizations;
private boolean waitForWrites;
- public startMultiScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private long batchTimeOut;
+ public startMultiScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.tinfo = tinfo;
this.credentials = credentials;
@@ -1116,6 +1122,7 @@ import org.slf4j.LoggerFactory;
this.ssio = ssio;
this.authorizations = authorizations;
this.waitForWrites = waitForWrites;
+ this.batchTimeOut = batchTimeOut;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -1129,6 +1136,7 @@ import org.slf4j.LoggerFactory;
args.setSsio(ssio);
args.setAuthorizations(authorizations);
args.setWaitForWrites(waitForWrites);
+ args.setBatchTimeOut(batchTimeOut);
args.write(prot);
prot.writeMessageEnd();
}
@@ -2252,7 +2260,7 @@ import org.slf4j.LoggerFactory;
public startScan_result getResult(I iface, startScan_args args) throws org.apache.thrift.TException {
startScan_result result = new startScan_result();
try {
- result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold);
+ result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold, args.batchTimeOut);
} catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) {
result.sec = sec;
} catch (NotServingTabletException nste) {
@@ -2327,7 +2335,7 @@ import org.slf4j.LoggerFactory;
public startMultiScan_result getResult(I iface, startMultiScan_args args) throws org.apache.thrift.TException {
startMultiScan_result result = new startMultiScan_result();
try {
- result.success = iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites);
+ result.success = iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.batchTimeOut);
} catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) {
result.sec = sec;
}
@@ -3042,7 +3050,7 @@ import org.slf4j.LoggerFactory;
}
public void start(I iface, startScan_args args, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.data.thrift.InitialScan> resultHandler) throws TException {
- iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold,resultHandler);
+ iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold, args.batchTimeOut,resultHandler);
}
}
@@ -3194,7 +3202,7 @@ import org.slf4j.LoggerFactory;
}
public void start(I iface, startMultiScan_args args, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.data.thrift.InitialMultiScan> resultHandler) throws TException {
- iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites,resultHandler);
+ iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.batchTimeOut,resultHandler);
}
}
@@ -4463,6 +4471,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL, (short)9);
private static final org.apache.thrift.protocol.TField ISOLATED_FIELD_DESC = new org.apache.thrift.protocol.TField("isolated", org.apache.thrift.protocol.TType.BOOL, (short)10);
private static final org.apache.thrift.protocol.TField READAHEAD_THRESHOLD_FIELD_DESC = new org.apache.thrift.protocol.TField("readaheadThreshold", org.apache.thrift.protocol.TType.I64, (short)12);
+ private static final org.apache.thrift.protocol.TField BATCH_TIME_OUT_FIELD_DESC = new org.apache.thrift.protocol.TField("batchTimeOut", org.apache.thrift.protocol.TType.I64, (short)13);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -4482,6 +4491,7 @@ import org.slf4j.LoggerFactory;
public boolean waitForWrites; // required
public boolean isolated; // required
public long readaheadThreshold; // required
+ public long batchTimeOut; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -4496,7 +4506,8 @@ import org.slf4j.LoggerFactory;
AUTHORIZATIONS((short)8, "authorizations"),
WAIT_FOR_WRITES((short)9, "waitForWrites"),
ISOLATED((short)10, "isolated"),
- READAHEAD_THRESHOLD((short)12, "readaheadThreshold");
+ READAHEAD_THRESHOLD((short)12, "readaheadThreshold"),
+ BATCH_TIME_OUT((short)13, "batchTimeOut");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -4535,6 +4546,8 @@ import org.slf4j.LoggerFactory;
return ISOLATED;
case 12: // READAHEAD_THRESHOLD
return READAHEAD_THRESHOLD;
+ case 13: // BATCH_TIME_OUT
+ return BATCH_TIME_OUT;
default:
return null;
}
@@ -4579,6 +4592,7 @@ import org.slf4j.LoggerFactory;
private static final int __WAITFORWRITES_ISSET_ID = 1;
private static final int __ISOLATED_ISSET_ID = 2;
private static final int __READAHEADTHRESHOLD_ISSET_ID = 3;
+ private static final int __BATCHTIMEOUT_ISSET_ID = 4;
private byte __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -4614,6 +4628,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
tmpMap.put(_Fields.READAHEAD_THRESHOLD, new org.apache.thrift.meta_data.FieldMetaData("readaheadThreshold", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.BATCH_TIME_OUT, new org.apache.thrift.meta_data.FieldMetaData("batchTimeOut", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startScan_args.class, metaDataMap);
}
@@ -4633,7 +4649,8 @@ import org.slf4j.LoggerFactory;
List<ByteBuffer> authorizations,
boolean waitForWrites,
boolean isolated,
- long readaheadThreshold)
+ long readaheadThreshold,
+ long batchTimeOut)
{
this();
this.tinfo = tinfo;
@@ -4652,6 +4669,8 @@ import org.slf4j.LoggerFactory;
setIsolatedIsSet(true);
this.readaheadThreshold = readaheadThreshold;
setReadaheadThresholdIsSet(true);
+ this.batchTimeOut = batchTimeOut;
+ setBatchTimeOutIsSet(true);
}
/**
@@ -4708,6 +4727,7 @@ import org.slf4j.LoggerFactory;
this.waitForWrites = other.waitForWrites;
this.isolated = other.isolated;
this.readaheadThreshold = other.readaheadThreshold;
+ this.batchTimeOut = other.batchTimeOut;
}
public startScan_args deepCopy() {
@@ -4732,6 +4752,8 @@ import org.slf4j.LoggerFactory;
this.isolated = false;
setReadaheadThresholdIsSet(false);
this.readaheadThreshold = 0;
+ setBatchTimeOutIsSet(false);
+ this.batchTimeOut = 0;
}
public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() {
@@ -5074,6 +5096,29 @@ import org.slf4j.LoggerFactory;
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID, value);
}
+ public long getBatchTimeOut() {
+ return this.batchTimeOut;
+ }
+
+ public startScan_args setBatchTimeOut(long batchTimeOut) {
+ this.batchTimeOut = batchTimeOut;
+ setBatchTimeOutIsSet(true);
+ return this;
+ }
+
+ public void unsetBatchTimeOut() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+ }
+
+ /** Returns true if field batchTimeOut is set (has been assigned a value) and false otherwise */
+ public boolean isSetBatchTimeOut() {
+ return EncodingUtils.testBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+ }
+
+ public void setBatchTimeOutIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case TINFO:
@@ -5172,6 +5217,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case BATCH_TIME_OUT:
+ if (value == null) {
+ unsetBatchTimeOut();
+ } else {
+ setBatchTimeOut((Long)value);
+ }
+ break;
+
}
}
@@ -5213,6 +5266,9 @@ import org.slf4j.LoggerFactory;
case READAHEAD_THRESHOLD:
return Long.valueOf(getReadaheadThreshold());
+ case BATCH_TIME_OUT:
+ return Long.valueOf(getBatchTimeOut());
+
}
throw new IllegalStateException();
}
@@ -5248,6 +5304,8 @@ import org.slf4j.LoggerFactory;
return isSetIsolated();
case READAHEAD_THRESHOLD:
return isSetReadaheadThreshold();
+ case BATCH_TIME_OUT:
+ return isSetBatchTimeOut();
}
throw new IllegalStateException();
}
@@ -5373,6 +5431,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_batchTimeOut = true;
+ boolean that_present_batchTimeOut = true;
+ if (this_present_batchTimeOut || that_present_batchTimeOut) {
+ if (!(this_present_batchTimeOut && that_present_batchTimeOut))
+ return false;
+ if (this.batchTimeOut != that.batchTimeOut)
+ return false;
+ }
+
return true;
}
@@ -5509,6 +5576,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetBatchTimeOut()).compareTo(other.isSetBatchTimeOut());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetBatchTimeOut()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.batchTimeOut, other.batchTimeOut);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -5608,6 +5685,10 @@ import org.slf4j.LoggerFactory;
sb.append("readaheadThreshold:");
sb.append(this.readaheadThreshold);
first = false;
+ if (!first) sb.append(", ");
+ sb.append("batchTimeOut:");
+ sb.append(this.batchTimeOut);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -5821,6 +5902,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 13: // BATCH_TIME_OUT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.batchTimeOut = iprot.readI64();
+ struct.setBatchTimeOutIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -5925,6 +6014,9 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldBegin(READAHEAD_THRESHOLD_FIELD_DESC);
oprot.writeI64(struct.readaheadThreshold);
oprot.writeFieldEnd();
+ oprot.writeFieldBegin(BATCH_TIME_OUT_FIELD_DESC);
+ oprot.writeI64(struct.batchTimeOut);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -5979,7 +6071,10 @@ import org.slf4j.LoggerFactory;
if (struct.isSetReadaheadThreshold()) {
optionals.set(11);
}
- oprot.writeBitSet(optionals, 12);
+ if (struct.isSetBatchTimeOut()) {
+ optionals.set(12);
+ }
+ oprot.writeBitSet(optionals, 13);
if (struct.isSetTinfo()) {
struct.tinfo.write(oprot);
}
@@ -6048,12 +6143,15 @@ import org.slf4j.LoggerFactory;
if (struct.isSetReadaheadThreshold()) {
oprot.writeI64(struct.readaheadThreshold);
}
+ if (struct.isSetBatchTimeOut()) {
+ oprot.writeI64(struct.batchTimeOut);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, startScan_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(12);
+ BitSet incoming = iprot.readBitSet(13);
if (incoming.get(0)) {
struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
struct.tinfo.read(iprot);
@@ -6157,6 +6255,10 @@ import org.slf4j.LoggerFactory;
struct.readaheadThreshold = iprot.readI64();
struct.setReadaheadThresholdIsSet(true);
}
+ if (incoming.get(12)) {
+ struct.batchTimeOut = iprot.readI64();
+ struct.setBatchTimeOutIsSet(true);
+ }
}
}
@@ -8417,6 +8519,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField SSIO_FIELD_DESC = new org.apache.thrift.protocol.TField("ssio", org.apache.thrift.protocol.TType.MAP, (short)5);
private static final org.apache.thrift.protocol.TField AUTHORIZATIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("authorizations", org.apache.thrift.protocol.TType.LIST, (short)6);
private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL, (short)7);
+ private static final org.apache.thrift.protocol.TField BATCH_TIME_OUT_FIELD_DESC = new org.apache.thrift.protocol.TField("batchTimeOut", org.apache.thrift.protocol.TType.I64, (short)9);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -8432,6 +8535,7 @@ import org.slf4j.LoggerFactory;
public Map<String,Map<String,String>> ssio; // required
public List<ByteBuffer> authorizations; // required
public boolean waitForWrites; // required
+ public long batchTimeOut; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -8442,7 +8546,8 @@ import org.slf4j.LoggerFactory;
SSI_LIST((short)4, "ssiList"),
SSIO((short)5, "ssio"),
AUTHORIZATIONS((short)6, "authorizations"),
- WAIT_FOR_WRITES((short)7, "waitForWrites");
+ WAIT_FOR_WRITES((short)7, "waitForWrites"),
+ BATCH_TIME_OUT((short)9, "batchTimeOut");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -8473,6 +8578,8 @@ import org.slf4j.LoggerFactory;
return AUTHORIZATIONS;
case 7: // WAIT_FOR_WRITES
return WAIT_FOR_WRITES;
+ case 9: // BATCH_TIME_OUT
+ return BATCH_TIME_OUT;
default:
return null;
}
@@ -8514,6 +8621,7 @@ import org.slf4j.LoggerFactory;
// isset id assignments
private static final int __WAITFORWRITES_ISSET_ID = 0;
+ private static final int __BATCHTIMEOUT_ISSET_ID = 1;
private byte __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -8541,6 +8649,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))));
tmpMap.put(_Fields.WAIT_FOR_WRITES, new org.apache.thrift.meta_data.FieldMetaData("waitForWrites", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+ tmpMap.put(_Fields.BATCH_TIME_OUT, new org.apache.thrift.meta_data.FieldMetaData("batchTimeOut", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startMultiScan_args.class, metaDataMap);
}
@@ -8556,7 +8666,8 @@ import org.slf4j.LoggerFactory;
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
Map<String,Map<String,String>> ssio,
List<ByteBuffer> authorizations,
- boolean waitForWrites)
+ boolean waitForWrites,
+ long batchTimeOut)
{
this();
this.tinfo = tinfo;
@@ -8568,6 +8679,8 @@ import org.slf4j.LoggerFactory;
this.authorizations = authorizations;
this.waitForWrites = waitForWrites;
setWaitForWritesIsSet(true);
+ this.batchTimeOut = batchTimeOut;
+ setBatchTimeOutIsSet(true);
}
/**
@@ -8618,6 +8731,7 @@ import org.slf4j.LoggerFactory;
this.authorizations = __this__authorizations;
}
this.waitForWrites = other.waitForWrites;
+ this.batchTimeOut = other.batchTimeOut;
}
public startMultiScan_args deepCopy() {
@@ -8635,6 +8749,8 @@ import org.slf4j.LoggerFactory;
this.authorizations = null;
setWaitForWritesIsSet(false);
this.waitForWrites = false;
+ setBatchTimeOutIsSet(false);
+ this.batchTimeOut = 0;
}
public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() {
@@ -8895,6 +9011,29 @@ import org.slf4j.LoggerFactory;
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __WAITFORWRITES_ISSET_ID, value);
}
+ public long getBatchTimeOut() {
+ return this.batchTimeOut;
+ }
+
+ public startMultiScan_args setBatchTimeOut(long batchTimeOut) {
+ this.batchTimeOut = batchTimeOut;
+ setBatchTimeOutIsSet(true);
+ return this;
+ }
+
+ public void unsetBatchTimeOut() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+ }
+
+ /** Returns true if field batchTimeOut is set (has been assigned a value) and false otherwise */
+ public boolean isSetBatchTimeOut() {
+ return EncodingUtils.testBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+ }
+
+ public void setBatchTimeOutIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case TINFO:
@@ -8961,6 +9100,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case BATCH_TIME_OUT:
+ if (value == null) {
+ unsetBatchTimeOut();
+ } else {
+ setBatchTimeOut((Long)value);
+ }
+ break;
+
}
}
@@ -8990,6 +9137,9 @@ import org.slf4j.LoggerFactory;
case WAIT_FOR_WRITES:
return Boolean.valueOf(isWaitForWrites());
+ case BATCH_TIME_OUT:
+ return Long.valueOf(getBatchTimeOut());
+
}
throw new IllegalStateException();
}
@@ -9017,6 +9167,8 @@ import org.slf4j.LoggerFactory;
return isSetAuthorizations();
case WAIT_FOR_WRITES:
return isSetWaitForWrites();
+ case BATCH_TIME_OUT:
+ return isSetBatchTimeOut();
}
throw new IllegalStateException();
}
@@ -9106,6 +9258,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_batchTimeOut = true;
+ boolean that_present_batchTimeOut = true;
+ if (this_present_batchTimeOut || that_present_batchTimeOut) {
+ if (!(this_present_batchTimeOut && that_present_batchTimeOut))
+ return false;
+ if (this.batchTimeOut != that.batchTimeOut)
+ return false;
+ }
+
return true;
}
@@ -9202,6 +9363,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetBatchTimeOut()).compareTo(other.isSetBatchTimeOut());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetBatchTimeOut()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.batchTimeOut, other.batchTimeOut);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -9281,6 +9452,10 @@ import org.slf4j.LoggerFactory;
sb.append("waitForWrites:");
sb.append(this.waitForWrites);
first = false;
+ if (!first) sb.append(", ");
+ sb.append("batchTimeOut:");
+ sb.append(this.batchTimeOut);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -9478,6 +9653,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 9: // BATCH_TIME_OUT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.batchTimeOut = iprot.readI64();
+ struct.setBatchTimeOutIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -9583,6 +9766,9 @@ import org.slf4j.LoggerFactory;
struct.tinfo.write(oprot);
oprot.writeFieldEnd();
}
+ oprot.writeFieldBegin(BATCH_TIME_OUT_FIELD_DESC);
+ oprot.writeI64(struct.batchTimeOut);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -9625,7 +9811,10 @@ import org.slf4j.LoggerFactory;
if (struct.isSetWaitForWrites()) {
optionals.set(7);
}
- oprot.writeBitSet(optionals, 8);
+ if (struct.isSetBatchTimeOut()) {
+ optionals.set(8);
+ }
+ oprot.writeBitSet(optionals, 9);
if (struct.isSetTinfo()) {
struct.tinfo.write(oprot);
}
@@ -9695,12 +9884,15 @@ import org.slf4j.LoggerFactory;
if (struct.isSetWaitForWrites()) {
oprot.writeBool(struct.waitForWrites);
}
+ if (struct.isSetBatchTimeOut()) {
+ oprot.writeI64(struct.batchTimeOut);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, startMultiScan_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(8);
+ BitSet incoming = iprot.readBitSet(9);
if (incoming.get(0)) {
struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
struct.tinfo.read(iprot);
@@ -9808,6 +10000,10 @@ import org.slf4j.LoggerFactory;
struct.waitForWrites = iprot.readBool();
struct.setWaitForWritesIsSet(true);
}
+ if (incoming.get(8)) {
+ struct.batchTimeOut = iprot.readI64();
+ struct.setBatchTimeOutIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 4a31036..051daee 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -149,7 +149,8 @@ service TabletClientService extends client.ClientService {
8:list<binary> authorizations
9:bool waitForWrites,
10:bool isolated,
- 12:i64 readaheadThreshold) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
+ 12:i64 readaheadThreshold,
+ 13:i64 batchTimeOut) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID),
@@ -161,8 +162,9 @@ service TabletClientService extends client.ClientService {
3:list<data.TColumn> columns,
4:list<data.IterInfo> ssiList,
5:map<string, map<string, string>> ssio,
- 6:list<binary> authorizations
- 7:bool waitForWrites) throws (1:client.ThriftSecurityException sec),
+ 6:list<binary> authorizations,
+ 7:bool waitForWrites,
+ 9:i64 batchTimeOut) throws (1:client.ThriftSecurityException sec),
data.MultiScanResult continueMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi),
void closeMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
index 523d157..7351ede 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
@@ -74,6 +74,7 @@ public class TableOperationsImplTest {
// IsolatedScanner -- make the verification pass, not really relevant
EasyMock.expect(scanner.getRange()).andReturn(range).anyTimes();
EasyMock.expect(scanner.getTimeout(TimeUnit.MILLISECONDS)).andReturn(Long.MAX_VALUE);
+ EasyMock.expect(scanner.getBatchTimeout(TimeUnit.MILLISECONDS)).andReturn(Long.MAX_VALUE);
EasyMock.expect(scanner.getBatchSize()).andReturn(1000);
EasyMock.expect(scanner.getReadaheadThreshold()).andReturn(100l);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index c0c979b..0d7ade8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -189,7 +189,7 @@ public class VerifyTabletAssignments {
List<IterInfo> emptyListIterInfo = Collections.emptyList();
List<TColumn> emptyListColumn = Collections.emptyList();
InitialMultiScan is = client.startMultiScan(tinfo, context.rpcCreds(), batch, emptyListColumn, emptyListIterInfo, emptyMapSMapSS,
- Authorizations.EMPTY.getAuthorizationsBB(), false);
+ Authorizations.EMPTY.getAuthorizationsBB(), false, 0L);
if (is.result.more) {
MultiScanResult result = client.continueMultiScan(tinfo, is.scanID);
checkFailures(entry.getKey(), failures, result);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
index 0ba13c7..750ad8e 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
@@ -123,4 +123,15 @@ public class NullScanner implements Scanner {
public void setReadaheadThreshold(long batches) {
}
+
+ @Override
+ public void setBatchTimeout(long timeout, TimeUnit milliseconds) {
+
+ }
+
+ @Override
+ public long getBatchTimeout(TimeUnit timeUnit) {
+ return 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 4d0b9f6..dc382f2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -442,7 +442,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
- long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ long readaheadThreshold, long batchTimeOut) throws NotServingTabletException, ThriftSecurityException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
String tableId = new String(textent.getTable(), UTF_8);
if (!security.canScan(credentials, tableId, Tables.getNamespaceId(getInstance(), tableId), range, columns, ssiList, ssio, authorizations))
@@ -474,9 +475,10 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
for (TColumn tcolumn : columns) {
columnSet.add(new Column(tcolumn));
}
- final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, new Authorizations(authorizations), readaheadThreshold);
+ final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, new Authorizations(authorizations), readaheadThreshold,
+ batchTimeOut);
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
- scanSession.interruptFlag);
+ scanSession.interruptFlag, scanSession.batchTimeOut);
long sid = sessionManager.createSession(scanSession, true);
@@ -588,7 +590,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
- List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut)
+ throws ThriftSecurityException {
// find all of the tables that need to be scanned
final HashSet<String> tables = new HashSet<String>();
for (TKeyExtent keyExtent : tbatch.keySet()) {
@@ -619,7 +622,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
- final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations));
+ final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations), batchTimeOut);
mss.numTablets = batch.size();
for (List<Range> ranges : batch.values()) {
@@ -1108,7 +1111,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
IterConfig ic = compressedIters.decompress(tc.iterators);
- Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+ Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag, 0);
try {
ScanBatch batch = scanner.read();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 08597f4..57a09ce 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -111,7 +111,7 @@ public class LookupTask extends ScanTask<MultiScanResult> {
interruptFlag.set(true);
lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList, session.ssio,
- interruptFlag);
+ interruptFlag, session.batchTimeOut);
// if the tablet was closed it it possible that the
// interrupt flag was set.... do not want it set for
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index b326e10..fccac47 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -36,6 +36,7 @@ public class MultiScanSession extends Session {
public final List<IterInfo> ssiList;
public final Map<String,Map<String,String>> ssio;
public final Authorizations auths;
+ public final long batchTimeOut;
// stats
public int numRanges;
@@ -46,13 +47,14 @@ public class MultiScanSession extends Session {
public volatile ScanTask<MultiScanResult> lookupTask;
public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, Authorizations authorizations) {
+ Map<String,Map<String,String>> ssio, Authorizations authorizations, long batchTimeOut) {
super(credentials);
this.queries = queries;
this.ssiList = ssiList;
this.ssio = ssio;
this.auths = authorizations;
this.threadPoolExtent = threadPoolExtent;
+ this.batchTimeOut = batchTimeOut;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index d5b0027..7a1d400 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -44,9 +44,10 @@ public class ScanSession extends Session {
public volatile ScanTask<ScanBatch> nextBatchTask;
public Scanner scanner;
public final long readaheadThreshold;
+ public final long batchTimeOut;
public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
- Authorizations authorizations, long readaheadThreshold) {
+ Authorizations authorizations, long readaheadThreshold, long batchTimeOut) {
super(credentials);
this.extent = extent;
this.columnSet = columnSet;
@@ -54,6 +55,7 @@ public class ScanSession extends Session {
this.ssio = ssio;
this.auths = authorizations;
this.readaheadThreshold = readaheadThreshold;
+ this.batchTimeOut = batchTimeOut;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 33277bd..853714a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -65,10 +65,10 @@ class ScanDataSource implements DataSource {
private final ScanOptions options;
ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) {
+ Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, long batchTimeOut) {
this.tablet = tablet;
expectedDeletionCount = tablet.getDataSourceDeletions();
- this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
+ this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false, batchTimeOut);
this.interruptFlag = interruptFlag;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
index 93e8eee..2a38fbd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
@@ -35,9 +35,10 @@ final class ScanOptions {
private final AtomicBoolean interruptFlag;
private final int num;
private final boolean isolated;
+ private final long batchTimeOut;
ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
- AtomicBoolean interruptFlag, boolean isolated) {
+ AtomicBoolean interruptFlag, boolean isolated, long batchTimeOut) {
this.num = num;
this.authorizations = authorizations;
this.defaultLabels = defaultLabels;
@@ -46,6 +47,7 @@ final class ScanOptions {
this.ssio = ssio;
this.interruptFlag = interruptFlag;
this.isolated = isolated;
+ this.batchTimeOut = batchTimeOut;
}
public Authorizations getAuthorizations() {
@@ -79,4 +81,8 @@ final class ScanOptions {
public boolean isIsolated() {
return isolated;
}
+
+ public long getBatchTimeOut() {
+ return batchTimeOut;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 790a352..3ce10d1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -79,7 +79,7 @@ public class Scanner {
iter = new SourceSwitchingIterator(dataSource, false);
}
- results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet());
+ results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet(), options.getBatchTimeOut());
if (results.getResults() == null) {
range = null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index f131372..4930219 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -42,6 +42,7 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -718,7 +719,7 @@ public class Tablet implements TabletCommitter {
}
}
- private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, List<KVEntry> results, long maxResultsSize)
+ private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, List<KVEntry> results, long maxResultsSize, long batchTimeOut)
throws IOException {
LookupResult lookupResult = new LookupResult();
@@ -730,9 +731,16 @@ public class Tablet implements TabletCommitter {
if (columnSet.size() > 0)
cfset = LocalityGroupUtil.families(columnSet);
+ long returnTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
+ if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) {
+ batchTimeOut = 0;
+ }
+
for (Range range : ranges) {
- if (exceededMemoryUsage || tabletClosed) {
+ boolean timesUp = batchTimeOut > 0 && System.nanoTime() > returnTime;
+
+ if (exceededMemoryUsage || tabletClosed || timesUp) {
lookupResult.unfinishedRanges.add(range);
continue;
}
@@ -756,7 +764,9 @@ public class Tablet implements TabletCommitter {
exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
- if (exceededMemoryUsage) {
+ timesUp = batchTimeOut > 0 && System.nanoTime() > returnTime;
+
+ if (exceededMemoryUsage || timesUp) {
addUnfinishedRange(lookupResult, range, key, false);
break;
}
@@ -815,7 +825,7 @@ public class Tablet implements TabletCommitter {
}
public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, List<KVEntry> results, long maxResultSize,
- List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) throws IOException {
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, long batchTimeOut) throws IOException {
if (ranges.size() == 0) {
return new LookupResult();
@@ -833,13 +843,13 @@ public class Tablet implements TabletCommitter {
tabletRange.clip(range);
}
- ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
+ ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, batchTimeOut);
LookupResult result = null;
try {
SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
- result = lookup(iter, ranges, columns, results, maxResultSize);
+ result = lookup(iter, ranges, columns, results, maxResultSize, batchTimeOut);
return result;
} catch (IOException ioe) {
dataSource.close(true);
@@ -857,10 +867,14 @@ public class Tablet implements TabletCommitter {
}
}
- Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
+ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns, long batchTimeOut) throws IOException {
// log.info("In nextBatch..");
+ long stopTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
+ if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) {
+ batchTimeOut = 0;
+ }
List<KVEntry> results = new ArrayList<KVEntry>();
Key key = null;
@@ -890,7 +904,9 @@ public class Tablet implements TabletCommitter {
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();
- if (resultSize >= maxResultsSize || results.size() >= num) {
+ boolean timesUp = batchTimeOut > 0 && System.nanoTime() >= stopTime;
+
+ if (resultSize >= maxResultsSize || results.size() >= num || timesUp) {
continueKey = new Key(key);
skipContinueKey = true;
break;
@@ -931,12 +947,12 @@ public class Tablet implements TabletCommitter {
}
public Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
+ Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag, long batchTimeOut) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
extent.toDataRange().clip(range);
- ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
+ ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated, batchTimeOut);
return new Scanner(this, range, opts);
}