You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/12/19 19:14:09 UTC

svn commit: r1423994 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/client/ server/src/main/java/org/apache/accumulo/serve...

Author: ecn
Date: Wed Dec 19 18:14:08 2012
New Revision: 1423994

URL: http://svn.apache.org/viewvc?rev=1423994&view=rev
Log:
ACCUMULO-408 track threads which are assigning files to tservers, make connection timeout configurable for bulk requests

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java Wed Dec 19 18:14:08 2012
@@ -124,6 +124,11 @@ public class ServerClient {
   }
   
   public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections) throws TTransportException {
+    AccumuloConfiguration conf = instance.getConfiguration();
+    return getConnection(instance, false, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+  }
+  
+  public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
     ArgumentChecker.notNull(instance);
     // create list of servers
     ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
@@ -139,7 +144,7 @@ public class ServerClient {
         servers.add(new ThriftTransportKey(
             new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT), 
             conf.getPort(Property.TSERV_CLIENTPORT), 
-            conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)));
+            rpcTimeout));
     }
     
     boolean opened = false;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Dec 19 18:14:08 2012
@@ -65,7 +65,8 @@ public enum Property {
   MASTER_RECOVERY_MAXTIME("master.recovery.time.max", "30m", PropertyType.TIMEDURATION, "The maximum time to attempt recovery before giving up"),
   MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."),
   MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
-  MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
+  MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request"),
+  MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
       "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
@@ -143,7 +144,8 @@ public enum Property {
           + " the file to the appropriate tablets on all servers.  This property controls the number of threads used to communicate to the other servers."),
   TSERV_BULK_RETRY("tserver.bulk.retry.max", "3", PropertyType.COUNT,
       "The number of times the tablet server will attempt to assign a file to a tablet as it migrates and splits."),
-  TSERV_MINTHREADS("tserver.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
+  TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request."),
+  TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
   TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
   TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
       "The maximum time for a tablet server to be in the \"memory full\" state.  If the tablet server cannot write out memory"

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Wed Dec 19 18:14:08 2012
@@ -578,7 +578,8 @@ public class BulkImporter {
   private List<KeyExtent> assignMapFiles(AuthInfo credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) throws AccumuloException,
       AccumuloSecurityException {
     try {
-      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+      long timeInMillis = instance.getConfiguration().getMemoryInBytes(Property.TSERV_BULK_TIMEOUT);
+      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeInMillis);
       try {
         HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
         for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Wed Dec 19 18:14:08 2012
@@ -55,7 +55,7 @@ import org.apache.thrift.TException;
 public class ClientServiceHandler implements ClientService.Iface {
   private static final Logger log = Logger.getLogger(ClientServiceHandler.class);
   private static Authenticator authenticator = ZKAuthenticator.getInstance();
-  private final TransactionWatcher transactionWatcher;
+  protected final TransactionWatcher transactionWatcher;
   private final Instance instance;
   
   public ClientServiceHandler(Instance instance, TransactionWatcher transactionWatcher) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Dec 19 18:14:08 2012
@@ -529,7 +529,8 @@ class LoadFiles extends MasterRepo {
               // get a connection to a random tablet server, do not prefer cached connections because
               // this is running on the master and there are lots of connections to tablet servers
               // serving the !METADATA tablets
-              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false);
+              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
               client = pair.getSecond();
               server = pair.getFirst();
               List<String> attempt = Collections.singletonList(file);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1423994&r1=1423993&r2=1423994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Wed Dec 19 18:14:08 2012
@@ -49,6 +49,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -868,38 +869,40 @@ public class TabletServer extends Abstra
     }
     
     @Override
-    public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
-        throws ThriftSecurityException {
+    public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, final long tid, final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
+        throws TException {
       
       try {
         if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
           throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
+        return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>() {
+          public List<TKeyExtent> call() throws Exception {
+            List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+            for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+              TKeyExtent tke = entry.getKey();
+              Map<String,MapFileInfo> fileMap = entry.getValue();
+              
+              Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+              
+              if (importTablet == null) {
+                failures.add(tke);
+              } else {
+                try {
+                  importTablet.importMapFiles(tid, fileMap, setTime);
+                } catch (IOException ioe) {
+                  log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+                  failures.add(tke);
+                }
+              }
+            }
+            return failures;
+          }
+        });
       } catch (AccumuloSecurityException e) {
         throw e.asThriftException();
+      } catch (Exception ex) {
+        throw new TException(ex);
       }
-      
-      ArrayList<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-      
-      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
-        TKeyExtent tke = entry.getKey();
-        Map<String,MapFileInfo> fileMap = entry.getValue();
-        
-        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-        
-        if (importTablet == null) {
-          failures.add(tke);
-        } else {
-          try {
-            importTablet.importMapFiles(tid, fileMap, setTime);
-          } catch (IOException ioe) {
-            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
-            failures.add(tke);
-          }
-        }
-        
-      }
-      
-      return failures;
     }
     
     private class NextBatchTask extends ScanTask<ScanBatch> {



Re: svn commit: r1423994 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/client/ server/src/main/java/org/apache/accumulo/serve...

Posted by Keith Turner <ke...@deenlo.com>.
On Wed, Dec 19, 2012 at 1:14 PM,  <ec...@apache.org> wrote:
> Author: ecn
> Date: Wed Dec 19 18:14:08 2012
> New Revision: 1423994
>
> URL: http://svn.apache.org/viewvc?rev=1423994&view=rev
> Log:
> ACCUMULO-408 track threads which are assigning files to tservers, make connection timeout configurable for bulk requests
>
> Modified:
>     accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
>     accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
>     accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
>     accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
>     accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
>     accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
>
> Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java (original)
> +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java Wed Dec 19 18:14:08 2012
> @@ -124,6 +124,11 @@ public class ServerClient {
>    }
>

In the method below, the preferCachedConnections parameter is ignored

>    public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections) throws TTransportException {
> +    AccumuloConfiguration conf = instance.getConfiguration();
> +    return getConnection(instance, false, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
> +  }
> +
> +  public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
>      ArgumentChecker.notNull(instance);
>      // create list of servers
>      ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
> @@ -139,7 +144,7 @@ public class ServerClient {
>          servers.add(new ThriftTransportKey(
>              new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),
>              conf.getPort(Property.TSERV_CLIENTPORT),
> -            conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)));
> +            rpcTimeout));
>      }
>
>      boolean opened = false;
>
> Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
> +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Dec 19 18:14:08 2012
> @@ -65,7 +65,8 @@ public enum Property {
>    MASTER_RECOVERY_MAXTIME("master.recovery.time.max", "30m", PropertyType.TIMEDURATION, "The maximum time to attempt recovery before giving up"),
>    MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."),
>    MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
> -  MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
> +  MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request"),
> +  MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
>    MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
>    MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
>        "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
> @@ -143,7 +144,8 @@ public enum Property {
>            + " the file to the appropriate tablets on all servers.  This property controls the number of threads used to communicate to the other servers."),
>    TSERV_BULK_RETRY("tserver.bulk.retry.max", "3", PropertyType.COUNT,
>        "The number of times the tablet server will attempt to assign a file to a tablet as it migrates and splits."),
> -  TSERV_MINTHREADS("tserver.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
> +  TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request."),
> +  TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
>    TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
>    TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
>        "The maximum time for a tablet server to be in the \"memory full\" state.  If the tablet server cannot write out memory"
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Wed Dec 19 18:14:08 2012
> @@ -578,7 +578,8 @@ public class BulkImporter {
>    private List<KeyExtent> assignMapFiles(AuthInfo credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) throws AccumuloException,
>        AccumuloSecurityException {
>      try {
> -      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());

calling getMemoryInBytes() to get a time?

> +      long timeInMillis = instance.getConfiguration().getMemoryInBytes(Property.TSERV_BULK_TIMEOUT);
> +      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeInMillis);
>        try {
>          HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
>          for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Wed Dec 19 18:14:08 2012
> @@ -55,7 +55,7 @@ import org.apache.thrift.TException;
>  public class ClientServiceHandler implements ClientService.Iface {
>    private static final Logger log = Logger.getLogger(ClientServiceHandler.class);
>    private static Authenticator authenticator = ZKAuthenticator.getInstance();
> -  private final TransactionWatcher transactionWatcher;
> +  protected final TransactionWatcher transactionWatcher;
>    private final Instance instance;
>
>    public ClientServiceHandler(Instance instance, TransactionWatcher transactionWatcher) {
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Dec 19 18:14:08 2012
> @@ -529,7 +529,8 @@ class LoadFiles extends MasterRepo {
>                // get a connection to a random tablet server, do not prefer cached connections because
>                // this is running on the master and there are lots of connections to tablet servers
>                // serving the !METADATA tablets
> -              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false);
> +              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
> +              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
>                client = pair.getSecond();
>                server = pair.getFirst();
>                List<String> attempt = Collections.singletonList(file);
>
> Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
> URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1423994&r1=1423993&r2=1423994&view=diff
> ==============================================================================
> --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
> +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Wed Dec 19 18:14:08 2012
> @@ -49,6 +49,7 @@ import java.util.TreeSet;
>  import java.util.UUID;
>  import java.util.concurrent.ArrayBlockingQueue;
>  import java.util.concurrent.BlockingDeque;
> +import java.util.concurrent.Callable;
>  import java.util.concurrent.CancellationException;
>  import java.util.concurrent.ExecutionException;
>  import java.util.concurrent.LinkedBlockingDeque;
> @@ -868,38 +869,40 @@ public class TabletServer extends Abstra
>      }
>
>      @Override
> -    public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
> -        throws ThriftSecurityException {
> +    public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, final long tid, final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
> +        throws TException {
>
>        try {
>          if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
>            throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
> +        return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>() {
> +          public List<TKeyExtent> call() throws Exception {
> +            List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
> +            for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
> +              TKeyExtent tke = entry.getKey();
> +              Map<String,MapFileInfo> fileMap = entry.getValue();
> +
> +              Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
> +
> +              if (importTablet == null) {
> +                failures.add(tke);
> +              } else {
> +                try {
> +                  importTablet.importMapFiles(tid, fileMap, setTime);
> +                } catch (IOException ioe) {
> +                  log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
> +                  failures.add(tke);
> +                }
> +              }
> +            }
> +            return failures;
> +          }
> +        });
>        } catch (AccumuloSecurityException e) {
>          throw e.asThriftException();
> +      } catch (Exception ex) {
> +        throw new TException(ex);
>        }
> -
> -      ArrayList<TKeyExtent> failures = new ArrayList<TKeyExtent>();
> -
> -      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
> -        TKeyExtent tke = entry.getKey();
> -        Map<String,MapFileInfo> fileMap = entry.getValue();
> -
> -        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
> -
> -        if (importTablet == null) {
> -          failures.add(tke);
> -        } else {
> -          try {
> -            importTablet.importMapFiles(tid, fileMap, setTime);
> -          } catch (IOException ioe) {
> -            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
> -            failures.add(tke);
> -          }
> -        }
> -
> -      }
> -
> -      return failures;
>      }
>
>      private class NextBatchTask extends ScanTask<ScanBatch> {
>
>