You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/12/19 19:14:09 UTC
svn commit: r1423994 - in /accumulo/trunk:
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/conf/
server/src/main/java/org/apache/accumulo/server/client/
server/src/main/java/org/apache/accumulo/serve...
Author: ecn
Date: Wed Dec 19 18:14:08 2012
New Revision: 1423994
URL: http://svn.apache.org/viewvc?rev=1423994&view=rev
Log:
ACCUMULO-408 track threads which are assigning files to tservers, make connection timeout configurable for bulk requests
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java Wed Dec 19 18:14:08 2012
@@ -124,6 +124,11 @@ public class ServerClient {
}
public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections) throws TTransportException {
+ AccumuloConfiguration conf = instance.getConfiguration();
+ return getConnection(instance, false, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ }
+
+ public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
ArgumentChecker.notNull(instance);
// create list of servers
ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
@@ -139,7 +144,7 @@ public class ServerClient {
servers.add(new ThriftTransportKey(
new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),
conf.getPort(Property.TSERV_CLIENTPORT),
- conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)));
+ rpcTimeout));
}
boolean opened = false;
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Dec 19 18:14:08 2012
@@ -65,7 +65,8 @@ public enum Property {
MASTER_RECOVERY_MAXTIME("master.recovery.time.max", "30m", PropertyType.TIMEDURATION, "The maximum time to attempt recovery before giving up"),
MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."),
MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
- MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
+ MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request"),
+ MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
"When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
@@ -143,7 +144,8 @@ public enum Property {
+ " the file to the appropriate tablets on all servers. This property controls the number of threads used to communicate to the other servers."),
TSERV_BULK_RETRY("tserver.bulk.retry.max", "3", PropertyType.COUNT,
"The number of times the tablet server will attempt to assign a file to a tablet as it migrates and splits."),
- TSERV_MINTHREADS("tserver.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
+ TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request."),
+ TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
"The maximum time for a tablet server to be in the \"memory full\" state. If the tablet server cannot write out memory"
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Wed Dec 19 18:14:08 2012
@@ -578,7 +578,8 @@ public class BulkImporter {
private List<KeyExtent> assignMapFiles(AuthInfo credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) throws AccumuloException,
AccumuloSecurityException {
try {
- TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+ long timeInMillis = instance.getConfiguration().getMemoryInBytes(Property.TSERV_BULK_TIMEOUT);
+ TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeInMillis);
try {
HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Wed Dec 19 18:14:08 2012
@@ -55,7 +55,7 @@ import org.apache.thrift.TException;
public class ClientServiceHandler implements ClientService.Iface {
private static final Logger log = Logger.getLogger(ClientServiceHandler.class);
private static Authenticator authenticator = ZKAuthenticator.getInstance();
- private final TransactionWatcher transactionWatcher;
+ protected final TransactionWatcher transactionWatcher;
private final Instance instance;
public ClientServiceHandler(Instance instance, TransactionWatcher transactionWatcher) {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Dec 19 18:14:08 2012
@@ -529,7 +529,8 @@ class LoadFiles extends MasterRepo {
// get a connection to a random tablet server, do not prefer cached connections because
// this is running on the master and there are lots of connections to tablet servers
// serving the !METADATA tablets
- Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false);
+ long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+ Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
client = pair.getSecond();
server = pair.getFirst();
List<String> attempt = Collections.singletonList(file);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Wed Dec 19 18:14:08 2012
@@ -49,6 +49,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
@@ -868,38 +869,40 @@ public class TabletServer extends Abstra
}
@Override
- public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
- throws ThriftSecurityException {
+ public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, final long tid, final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
+ throws TException {
try {
if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
+ return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>() {
+ public List<TKeyExtent> call() throws Exception {
+ List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+ for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+ TKeyExtent tke = entry.getKey();
+ Map<String,MapFileInfo> fileMap = entry.getValue();
+
+ Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+
+ if (importTablet == null) {
+ failures.add(tke);
+ } else {
+ try {
+ importTablet.importMapFiles(tid, fileMap, setTime);
+ } catch (IOException ioe) {
+ log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+ failures.add(tke);
+ }
+ }
+ }
+ return failures;
+ }
+ });
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
+ } catch (Exception ex) {
+ throw new TException(ex);
}
-
- ArrayList<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-
- for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
- TKeyExtent tke = entry.getKey();
- Map<String,MapFileInfo> fileMap = entry.getValue();
-
- Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-
- if (importTablet == null) {
- failures.add(tke);
- } else {
- try {
- importTablet.importMapFiles(tid, fileMap, setTime);
- } catch (IOException ioe) {
- log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
- failures.add(tke);
- }
- }
-
- }
-
- return failures;
}
private class NextBatchTask extends ScanTask<ScanBatch> {
Re: svn commit: r1423994 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/client/
server/src/main/java/org/apache/accumulo/serve...
Posted by Keith Turner <ke...@deenlo.com>.
On Wed, Dec 19, 2012 at 1:14 PM, <ec...@apache.org> wrote:
> Author: ecn
> Date: Wed Dec 19 18:14:08 2012
> New Revision: 1423994
>
> URL: http://svn.apache.org/viewvc?rev=1423994&view=rev
> Log:
> ACCUMULO-408 track threads which are assigning files to tservers, make connection timeout configurable for bulk requests
>
> Modified:
> accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
> accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
> accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
> accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
> accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
> accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
>
> Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java (original)
> +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java Wed Dec 19 18:14:08 2012
> @@ -124,6 +124,11 @@ public class ServerClient {
> }
>
In the method below, the preferCachedConnections parameter is ignored
> public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections) throws TTransportException {
> + AccumuloConfiguration conf = instance.getConfiguration();
> + return getConnection(instance, false, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
> + }
> +
> + public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
> ArgumentChecker.notNull(instance);
> // create list of servers
> ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
> @@ -139,7 +144,7 @@ public class ServerClient {
> servers.add(new ThriftTransportKey(
> new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),
> conf.getPort(Property.TSERV_CLIENTPORT),
> - conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)));
> + rpcTimeout));
> }
>
> boolean opened = false;
>
> Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
> +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Dec 19 18:14:08 2012
> @@ -65,7 +65,8 @@ public enum Property {
> MASTER_RECOVERY_MAXTIME("master.recovery.time.max", "30m", PropertyType.TIMEDURATION, "The maximum time to attempt recovery before giving up"),
> MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."),
> MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
> - MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
> + MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request"),
> + MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
> MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
> MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
> "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
> @@ -143,7 +144,8 @@ public enum Property {
> + " the file to the appropriate tablets on all servers. This property controls the number of threads used to communicate to the other servers."),
> TSERV_BULK_RETRY("tserver.bulk.retry.max", "3", PropertyType.COUNT,
> "The number of times the tablet server will attempt to assign a file to a tablet as it migrates and splits."),
> - TSERV_MINTHREADS("tserver.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
> + TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request."),
> + TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
> TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
> TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
> "The maximum time for a tablet server to be in the \"memory full\" state. If the tablet server cannot write out memory"
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Wed Dec 19 18:14:08 2012
> @@ -578,7 +578,8 @@ public class BulkImporter {
> private List<KeyExtent> assignMapFiles(AuthInfo credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) throws AccumuloException,
> AccumuloSecurityException {
> try {
> - TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
calling getMemoryInBytes() to get a time?
> + long timeInMillis = instance.getConfiguration().getMemoryInBytes(Property.TSERV_BULK_TIMEOUT);
> + TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeInMillis);
> try {
> HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
> for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Wed Dec 19 18:14:08 2012
> @@ -55,7 +55,7 @@ import org.apache.thrift.TException;
> public class ClientServiceHandler implements ClientService.Iface {
> private static final Logger log = Logger.getLogger(ClientServiceHandler.class);
> private static Authenticator authenticator = ZKAuthenticator.getInstance();
> - private final TransactionWatcher transactionWatcher;
> + protected final TransactionWatcher transactionWatcher;
> private final Instance instance;
>
> public ClientServiceHandler(Instance instance, TransactionWatcher transactionWatcher) {
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Dec 19 18:14:08 2012
> @@ -529,7 +529,8 @@ class LoadFiles extends MasterRepo {
> // get a connection to a random tablet server, do not prefer cached connections because
> // this is running on the master and there are lots of connections to tablet servers
> // serving the !METADATA tablets
> - Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false);
> + long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
> + Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
> client = pair.getSecond();
> server = pair.getFirst();
> List<String> attempt = Collections.singletonList(file);
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Wed Dec 19 18:14:08 2012
> @@ -49,6 +49,7 @@ import java.util.TreeSet;
> import java.util.UUID;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.BlockingDeque;
> +import java.util.concurrent.Callable;
> import java.util.concurrent.CancellationException;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.LinkedBlockingDeque;
> @@ -868,38 +869,40 @@ public class TabletServer extends Abstra
> }
>
> @Override
> - public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
> - throws ThriftSecurityException {
> + public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, final long tid, final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
> + throws TException {
>
> try {
> if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
> throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
> + return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>() {
> + public List<TKeyExtent> call() throws Exception {
> + List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
> + for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
> + TKeyExtent tke = entry.getKey();
> + Map<String,MapFileInfo> fileMap = entry.getValue();
> +
> + Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
> +
> + if (importTablet == null) {
> + failures.add(tke);
> + } else {
> + try {
> + importTablet.importMapFiles(tid, fileMap, setTime);
> + } catch (IOException ioe) {
> + log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
> + failures.add(tke);
> + }
> + }
> + }
> + return failures;
> + }
> + });
> } catch (AccumuloSecurityException e) {
> throw e.asThriftException();
> + } catch (Exception ex) {
> + throw new TException(ex);
> }
> -
> - ArrayList<TKeyExtent> failures = new ArrayList<TKeyExtent>();
> -
> - for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
> - TKeyExtent tke = entry.getKey();
> - Map<String,MapFileInfo> fileMap = entry.getValue();
> -
> - Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
> -
> - if (importTablet == null) {
> - failures.add(tke);
> - } else {
> - try {
> - importTablet.importMapFiles(tid, fileMap, setTime);
> - } catch (IOException ioe) {
> - log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
> - failures.add(tke);
> - }
> - }
> -
> - }
> -
> - return failures;
> }
>
> private class NextBatchTask extends ScanTask<ScanBatch> {
>
>