You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/10/14 19:59:03 UTC

[1/3] accumulo git commit: ACCUMULO-4028 use known information to pick a random server

Repository: accumulo
Updated Branches:
  refs/heads/master c30e558ae -> 10cafac94


ACCUMULO-4028 use known information to pick a random server


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0212f2ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0212f2ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0212f2ff

Branch: refs/heads/master
Commit: 0212f2ff2f304fb7d7261113882e28c1270968f3
Parents: 4deaf73
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 12:54:24 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 12:54:24 2015 -0400

----------------------------------------------------------------------
 .../accumulo/master/tableOps/BulkImport.java    | 22 +++++++++++---------
 1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0212f2ff/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index e661968..5320aae 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -42,10 +43,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.ClientService;
-import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -59,8 +57,9 @@ import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
@@ -539,23 +538,24 @@ class LoadFiles extends MasterRepo {
 
       // Use the threadpool to assign files one-at-a-time to the server
       final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
+      final Random random = new Random();
       for (final String file : filesToLoad) {
         results.add(executor.submit(new Callable<List<String>>() {
           @Override
           public List<String> call() {
             List<String> failures = new ArrayList<String>();
-            ClientService.Client client = null;
+            Client client = null;
             String server = null;
             try {
               // get a connection to a random tablet server, do not prefer cached connections because
               // this is running on the master and there are lots of connections to tablet servers
               // serving the metadata tablets
               long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-              Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
-              client = pair.getSecond();
-              server = pair.getFirst();
+              TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]);
+              server = servers[random.nextInt(servers.length)].getLocation().toString();
+              client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis);
               List<String> attempt = Collections.singletonList(file);
-              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
+              log.debug("Asking " + server + " to bulk import " + file);
               List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
                   errorDir, setTime);
               if (fail.isEmpty()) {
@@ -566,7 +566,9 @@ class LoadFiles extends MasterRepo {
             } catch (Exception ex) {
               log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
             } finally {
-              ServerClient.close(client);
+              if (client != null) {
+                ThriftUtil.returnClient(client);
+              }
             }
             return failures;
           }


[3/3] accumulo git commit: ACCUMULO-4028 use known information to pick a random server, merge to master

Posted by ec...@apache.org.
ACCUMULO-4028 use known information to pick a random server, merge to master


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/10cafac9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/10cafac9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/10cafac9

Branch: refs/heads/master
Commit: 10cafac9476eb8890df585da7901d02592b26640
Parents: c30e558 78e2c65
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 13:58:26 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 13:58:26 2015 -0400

----------------------------------------------------------------------
 .../accumulo/master/tableOps/BulkImport.java    |  1 +
 .../accumulo/master/tableOps/LoadFiles.java     | 22 ++++++++++++--------
 2 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/10cafac9/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10cafac9/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index 48cbaa5,a80e1ff..75b66da
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -31,20 -31,17 +32,18 @@@ import java.util.concurrent.Callable
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Future;
  import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
  
 +import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
- import org.apache.accumulo.core.client.impl.ServerClient;
  import org.apache.accumulo.core.client.impl.thrift.ClientService;
- import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
  import org.apache.accumulo.core.client.impl.thrift.TableOperation;
  import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.master.thrift.BulkImportState;
+ import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.trace.Tracer;
- import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.SimpleThreadPool;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.fate.Repo;
  import org.apache.accumulo.master.Master;
  import org.apache.accumulo.server.fs.VolumeManager;


[2/3] accumulo git commit: ACCUMULO-4028 use known information to pick a random server, merge to 1.7

Posted by ec...@apache.org.
ACCUMULO-4028 use known information to pick a random server, merge to 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78e2c65e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78e2c65e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78e2c65e

Branch: refs/heads/master
Commit: 78e2c65e282abbf36fdd502688a8eabf96f6a9a2
Parents: 91b161a 0212f2f
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 13:57:12 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 13:57:12 2015 -0400

----------------------------------------------------------------------
 .../accumulo/master/tableOps/BulkImport.java    |  1 +
 .../accumulo/master/tableOps/LoadFiles.java     | 24 ++++++++++++--------
 2 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index ad20473,5320aae..7001fdd
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -266,3 -286,337 +266,4 @@@ public class BulkImport extends MasterR
      ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
    }
  }
+ 
 -class CleanUpBulkImport extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    log.debug("removing the bulk processing flag file in " + bulk);
 -    Path bulkDir = new Path(bulk);
 -    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
 -    MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
 -    log.debug("removing the metadata table markers for loaded files");
 -    Connector conn = master.getConnector();
 -    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
 -    log.debug("releasing HDFS reservations for " + source + " and " + error);
 -    Utils.unreserveHdfsDirectory(source, tid);
 -    Utils.unreserveHdfsDirectory(error, tid);
 -    Utils.getReadLock(tableId, tid).unlock();
 -    log.debug("completing bulk import transaction " + tid);
 -    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
 -    return null;
 -  }
 -}
 -
 -class CompleteBulkImport extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CompleteBulkImport(String tableId, String source, String bulk, String error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
 -    return new CopyFailed(tableId, source, bulk, error);
 -  }
 -}
 -
 -class CopyFailed extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CopyFailed(String tableId, String source, String bulk, String error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public long isReady(long tid, Master master) throws Exception {
 -    Set<TServerInstance> finished = new HashSet<TServerInstance>();
 -    Set<TServerInstance> running = master.onlineTabletServers();
 -    for (TServerInstance server : running) {
 -      try {
 -        TServerConnection client = master.getConnection(server);
 -        if (client != null && !client.isActive(tid))
 -          finished.add(server);
 -      } catch (TException ex) {
 -        log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
 -      }
 -    }
 -    if (finished.containsAll(running))
 -      return 0;
 -    return 500;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    // This needs to execute after the arbiter is stopped
 -
 -    VolumeManager fs = master.getFileSystem();
 -
 -    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
 -      return new CleanUpBulkImport(tableId, source, bulk, error);
 -
 -    HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
 -    HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
 -
 -    FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
 -    BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8));
 -    try {
 -      String line = null;
 -      while ((line = in.readLine()) != null) {
 -        Path path = new Path(line);
 -        if (!fs.exists(new Path(error, path.getName())))
 -          failures.put(new FileRef(line, path), line);
 -      }
 -    } finally {
 -      failFile.close();
 -    }
 -
 -    /*
 -     * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
 -     * have no loaded markers.
 -     */
 -
 -    // determine which failed files were loaded
 -    Connector conn = master.getConnector();
 -    Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 -    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
 -    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
 -
 -    for (Entry<Key,Value> entry : mscanner) {
 -      if (Long.parseLong(entry.getValue().toString()) == tid) {
 -        FileRef loadedFile = new FileRef(fs, entry.getKey());
 -        String absPath = failures.remove(loadedFile);
 -        if (absPath != null) {
 -          loadedFailures.put(loadedFile, absPath);
 -        }
 -      }
 -    }
 -
 -    // move failed files that were not loaded
 -    for (String failure : failures.values()) {
 -      Path orig = new Path(failure);
 -      Path dest = new Path(error, orig.getName());
 -      fs.rename(orig, dest);
 -      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
 -    }
 -
 -    if (loadedFailures.size() > 0) {
 -      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
 -          + Constants.ZBULK_FAILED_COPYQ);
 -
 -      HashSet<String> workIds = new HashSet<String>();
 -
 -      for (String failure : loadedFailures.values()) {
 -        Path orig = new Path(failure);
 -        Path dest = new Path(error, orig.getName());
 -
 -        if (fs.exists(dest))
 -          continue;
 -
 -        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
 -        workIds.add(orig.getName());
 -        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
 -      }
 -
 -      bifCopyQueue.waitUntilDone(workIds);
 -    }
 -
 -    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
 -    return new CleanUpBulkImport(tableId, source, bulk, error);
 -  }
 -
 -}
 -
 -class LoadFiles extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private static ExecutorService threadPool = null;
 -  private static final Logger log = Logger.getLogger(BulkImport.class);
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String errorDir;
 -  private boolean setTime;
 -
 -  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.errorDir = errorDir;
 -    this.setTime = setTime;
 -  }
 -
 -  @Override
 -  public long isReady(long tid, Master master) throws Exception {
 -    if (master.onlineTabletServers().size() == 0)
 -      return 500;
 -    return 0;
 -  }
 -
 -  private static synchronized ExecutorService getThreadPool(Master master) {
 -    if (threadPool == null) {
 -      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 -      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
 -      pool.allowCoreThreadTimeOut(true);
 -      threadPool = new TraceExecutorService(pool);
 -    }
 -    return threadPool;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(final long tid, final Master master) throws Exception {
 -    ExecutorService executor = getThreadPool(master);
 -    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
 -    VolumeManager fs = master.getFileSystem();
 -    List<FileStatus> files = new ArrayList<FileStatus>();
 -    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 -      files.add(entry);
 -    }
 -    log.debug("tid " + tid + " importing " + files.size() + " files");
 -
 -    Path writable = new Path(this.errorDir, ".iswritable");
 -    if (!fs.createNewFile(writable)) {
 -      // Maybe this is a re-try... clear the flag and try again
 -      fs.delete(writable);
 -      if (!fs.createNewFile(writable))
 -        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 -            "Unable to write to " + this.errorDir);
 -    }
 -    fs.delete(writable);
 -
 -    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
 -    for (FileStatus f : files)
 -      filesToLoad.add(f.getPath().toString());
 -
 -    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
 -    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
 -      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
 -
 -      if (master.onlineTabletServers().size() == 0)
 -        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
 -
 -      while (master.onlineTabletServers().size() == 0) {
 -        UtilWaitThread.sleep(500);
 -      }
 -
 -      // Use the threadpool to assign files one-at-a-time to the server
 -      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
 -      final Random random = new Random();
 -      for (final String file : filesToLoad) {
 -        results.add(executor.submit(new Callable<List<String>>() {
 -          @Override
 -          public List<String> call() {
 -            List<String> failures = new ArrayList<String>();
 -            Client client = null;
 -            String server = null;
 -            try {
 -              // get a connection to a random tablet server, do not prefer cached connections because
 -              // this is running on the master and there are lots of connections to tablet servers
 -              // serving the metadata tablets
 -              long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
 -              TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]);
 -              server = servers[random.nextInt(servers.length)].getLocation().toString();
 -              client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis);
 -              List<String> attempt = Collections.singletonList(file);
 -              log.debug("Asking " + server + " to bulk import " + file);
 -              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
 -                  errorDir, setTime);
 -              if (fail.isEmpty()) {
 -                loaded.add(file);
 -              } else {
 -                failures.addAll(fail);
 -              }
 -            } catch (Exception ex) {
 -              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
 -            } finally {
 -              if (client != null) {
 -                ThriftUtil.returnClient(client);
 -              }
 -            }
 -            return failures;
 -          }
 -        }));
 -      }
 -      Set<String> failures = new HashSet<String>();
 -      for (Future<List<String>> f : results)
 -        failures.addAll(f.get());
 -      filesToLoad.removeAll(loaded);
 -      if (filesToLoad.size() > 0) {
 -        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
 -        UtilWaitThread.sleep(100);
 -      }
 -    }
 -
 -    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
 -    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
 -    try {
 -      for (String f : filesToLoad) {
 -        out.write(f);
 -        out.write("\n");
 -      }
 -    } finally {
 -      out.close();
 -    }
 -
 -    // return the next step, which will perform cleanup
 -    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 -  }
 -
 -  static String sampleList(Collection<?> potentiallyLongList, int max) {
 -    StringBuffer result = new StringBuffer();
 -    result.append("[");
 -    int i = 0;
 -    for (Object obj : potentiallyLongList) {
 -      result.append(obj);
 -      if (i >= max) {
 -        result.append("...");
 -        break;
 -      } else {
 -        result.append(", ");
 -      }
 -      i++;
 -    }
 -    if (i < max)
 -      result.delete(result.length() - 2, result.length());
 -    result.append("]");
 -    return result.toString();
 -  }
 -
 -}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index 4a56c6f,0000000..a80e1ff
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -1,209 -1,0 +1,213 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.BufferedWriter;
 +import java.io.OutputStreamWriter;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
++import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
- import org.apache.accumulo.core.client.impl.ServerClient;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService;
- import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.trace.Tracer;
- import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.fs.VolumeManager;
++import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.htrace.wrappers.TraceExecutorService;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import com.google.common.net.HostAndPort;
++
 +class LoadFiles extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static ExecutorService threadPool = null;
 +  private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String errorDir;
 +  private boolean setTime;
 +
 +  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (master.onlineTabletServers().size() == 0)
 +      return 500;
 +    return 0;
 +  }
 +
 +  private static synchronized ExecutorService getThreadPool(Master master) {
 +    if (threadPool == null) {
 +      int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 +      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
 +      pool.allowCoreThreadTimeOut(true);
 +      threadPool = new TraceExecutorService(pool);
 +    }
 +    return threadPool;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, final Master master) throws Exception {
 +    ExecutorService executor = getThreadPool(master);
 +    final AccumuloConfiguration conf = master.getConfiguration();
 +    VolumeManager fs = master.getFileSystem();
 +    List<FileStatus> files = new ArrayList<FileStatus>();
 +    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 +      files.add(entry);
 +    }
 +    log.debug("tid " + tid + " importing " + files.size() + " files");
 +
 +    Path writable = new Path(this.errorDir, ".iswritable");
 +    if (!fs.createNewFile(writable)) {
 +      // Maybe this is a re-try... clear the flag and try again
 +      fs.delete(writable);
 +      if (!fs.createNewFile(writable))
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +            "Unable to write to " + this.errorDir);
 +    }
 +    fs.delete(writable);
 +
 +    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
 +    for (FileStatus f : files)
 +      filesToLoad.add(f.getPath().toString());
 +
 +    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
 +    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
 +      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
 +
 +      if (master.onlineTabletServers().size() == 0)
 +        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
 +
 +      while (master.onlineTabletServers().size() == 0) {
 +        UtilWaitThread.sleep(500);
 +      }
 +
 +      // Use the threadpool to assign files one-at-a-time to the server
 +      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
++      final Random random = new Random();
 +      for (final String file : filesToLoad) {
 +        results.add(executor.submit(new Callable<List<String>>() {
 +          @Override
 +          public List<String> call() {
 +            List<String> failures = new ArrayList<String>();
 +            ClientService.Client client = null;
-             String server = null;
++            HostAndPort server = null;
 +            try {
 +              // get a connection to a random tablet server, do not prefer cached connections because
 +              // this is running on the master and there are lots of connections to tablet servers
 +              // serving the metadata tablets
 +              long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-               Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
-               client = pair.getSecond();
-               server = pair.getFirst();
++              // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
++              TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
++              server = servers[random.nextInt(servers.length)].getLocation();
++              client = ThriftUtil.getTServerClient(server, master, timeInMillis);
 +              List<String> attempt = Collections.singletonList(file);
-               log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
++              log.debug("Asking " + server + " to bulk import " + file);
 +              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
 +              if (fail.isEmpty()) {
 +                loaded.add(file);
 +              } else {
 +                failures.addAll(fail);
 +              }
 +            } catch (Exception ex) {
 +              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
 +            } finally {
-               ServerClient.close(client);
++              ThriftUtil.returnClient(client);
 +            }
 +            return failures;
 +          }
 +        }));
 +      }
 +      Set<String> failures = new HashSet<String>();
 +      for (Future<List<String>> f : results)
 +        failures.addAll(f.get());
 +      filesToLoad.removeAll(loaded);
 +      if (filesToLoad.size() > 0) {
 +        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
 +        UtilWaitThread.sleep(100);
 +      }
 +    }
 +
 +    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
 +    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
 +    try {
 +      for (String f : filesToLoad) {
 +        out.write(f);
 +        out.write("\n");
 +      }
 +    } finally {
 +      out.close();
 +    }
 +
 +    // return the next step, which will perform cleanup
 +    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 +  }
 +
 +  static String sampleList(Collection<?> potentiallyLongList, int max) {
 +    StringBuffer result = new StringBuffer();
 +    result.append("[");
 +    int i = 0;
 +    for (Object obj : potentiallyLongList) {
 +      result.append(obj);
 +      if (i >= max) {
 +        result.append("...");
 +        break;
 +      } else {
 +        result.append(", ");
 +      }
 +      i++;
 +    }
 +    if (i < max)
 +      result.delete(result.length() - 2, result.length());
 +    result.append("]");
 +    return result.toString();
 +  }
 +
- }
++}