You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/10/14 19:59:03 UTC
[1/3] accumulo git commit: ACCUMULO-4028 use known information to
pick a random server
Repository: accumulo
Updated Branches:
refs/heads/master c30e558ae -> 10cafac94
ACCUMULO-4028 use known information to pick a random server
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0212f2ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0212f2ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0212f2ff
Branch: refs/heads/master
Commit: 0212f2ff2f304fb7d7261113882e28c1270968f3
Parents: 4deaf73
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 12:54:24 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 12:54:24 2015 -0400
----------------------------------------------------------------------
.../accumulo/master/tableOps/BulkImport.java | 22 +++++++++++---------
1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0212f2ff/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index e661968..5320aae 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -42,10 +43,7 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.ServerClient;
import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.ClientService;
-import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -59,8 +57,9 @@ import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
@@ -539,23 +538,24 @@ class LoadFiles extends MasterRepo {
// Use the threadpool to assign files one-at-a-time to the server
final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
+ final Random random = new Random();
for (final String file : filesToLoad) {
results.add(executor.submit(new Callable<List<String>>() {
@Override
public List<String> call() {
List<String> failures = new ArrayList<String>();
- ClientService.Client client = null;
+ Client client = null;
String server = null;
try {
// get a connection to a random tablet server, do not prefer cached connections because
// this is running on the master and there are lots of connections to tablet servers
// serving the metadata tablets
long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
- client = pair.getSecond();
- server = pair.getFirst();
+ TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]);
+ server = servers[random.nextInt(servers.length)].getLocation().toString();
+ client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis);
List<String> attempt = Collections.singletonList(file);
- log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
+ log.debug("Asking " + server + " to bulk import " + file);
List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
errorDir, setTime);
if (fail.isEmpty()) {
@@ -566,7 +566,9 @@ class LoadFiles extends MasterRepo {
} catch (Exception ex) {
log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
} finally {
- ServerClient.close(client);
+ if (client != null) {
+ ThriftUtil.returnClient(client);
+ }
}
return failures;
}
[3/3] accumulo git commit: ACCUMULO-4028 use known information to
pick a random server, merge to master
Posted by ec...@apache.org.
ACCUMULO-4028 use known information to pick a random server, merge to master
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/10cafac9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/10cafac9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/10cafac9
Branch: refs/heads/master
Commit: 10cafac9476eb8890df585da7901d02592b26640
Parents: c30e558 78e2c65
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 13:58:26 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 13:58:26 2015 -0400
----------------------------------------------------------------------
.../accumulo/master/tableOps/BulkImport.java | 1 +
.../accumulo/master/tableOps/LoadFiles.java | 22 ++++++++++++--------
2 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/10cafac9/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/10cafac9/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index 48cbaa5,a80e1ff..75b66da
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -31,20 -31,17 +32,18 @@@ import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
- import org.apache.accumulo.core.client.impl.ServerClient;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
- import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
+ import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.trace.Tracer;
- import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.fs.VolumeManager;
[2/3] accumulo git commit: ACCUMULO-4028 use known information to
pick a random server, merge to 1.7
Posted by ec...@apache.org.
ACCUMULO-4028 use known information to pick a random server, merge to 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78e2c65e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78e2c65e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78e2c65e
Branch: refs/heads/master
Commit: 78e2c65e282abbf36fdd502688a8eabf96f6a9a2
Parents: 91b161a 0212f2f
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 13:57:12 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 13:57:12 2015 -0400
----------------------------------------------------------------------
.../accumulo/master/tableOps/BulkImport.java | 1 +
.../accumulo/master/tableOps/LoadFiles.java | 24 ++++++++++++--------
2 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index ad20473,5320aae..7001fdd
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -266,3 -286,337 +266,4 @@@ public class BulkImport extends MasterR
ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
}
}
+
-class CleanUpBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- log.debug("removing the bulk processing flag file in " + bulk);
- Path bulkDir = new Path(bulk);
- MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
- MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
- log.debug("removing the metadata table markers for loaded files");
- Connector conn = master.getConnector();
- MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
- log.debug("releasing HDFS reservations for " + source + " and " + error);
- Utils.unreserveHdfsDirectory(source, tid);
- Utils.unreserveHdfsDirectory(error, tid);
- Utils.getReadLock(tableId, tid).unlock();
- log.debug("completing bulk import transaction " + tid);
- ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
- return null;
- }
-}
-
-class CompleteBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CompleteBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
- return new CopyFailed(tableId, source, bulk, error);
- }
-}
-
-class CopyFailed extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CopyFailed(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- Set<TServerInstance> finished = new HashSet<TServerInstance>();
- Set<TServerInstance> running = master.onlineTabletServers();
- for (TServerInstance server : running) {
- try {
- TServerConnection client = master.getConnection(server);
- if (client != null && !client.isActive(tid))
- finished.add(server);
- } catch (TException ex) {
- log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
- }
- }
- if (finished.containsAll(running))
- return 0;
- return 500;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // This needs to execute after the arbiter is stopped
-
- VolumeManager fs = master.getFileSystem();
-
- if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
- return new CleanUpBulkImport(tableId, source, bulk, error);
-
- HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
- HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
-
- FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
- BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8));
- try {
- String line = null;
- while ((line = in.readLine()) != null) {
- Path path = new Path(line);
- if (!fs.exists(new Path(error, path.getName())))
- failures.put(new FileRef(line, path), line);
- }
- } finally {
- failFile.close();
- }
-
- /*
- * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
- * have no loaded markers.
- */
-
- // determine which failed files were loaded
- Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
- for (Entry<Key,Value> entry : mscanner) {
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- FileRef loadedFile = new FileRef(fs, entry.getKey());
- String absPath = failures.remove(loadedFile);
- if (absPath != null) {
- loadedFailures.put(loadedFile, absPath);
- }
- }
- }
-
- // move failed files that were not loaded
- for (String failure : failures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
- fs.rename(orig, dest);
- log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
- }
-
- if (loadedFailures.size() > 0) {
- DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
- + Constants.ZBULK_FAILED_COPYQ);
-
- HashSet<String> workIds = new HashSet<String>();
-
- for (String failure : loadedFailures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
-
- if (fs.exists(dest))
- continue;
-
- bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
- workIds.add(orig.getName());
- log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
- }
-
- bifCopyQueue.waitUntilDone(workIds);
- }
-
- fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
- return new CleanUpBulkImport(tableId, source, bulk, error);
- }
-
-}
-
-class LoadFiles extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static ExecutorService threadPool = null;
- private static final Logger log = Logger.getLogger(BulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String errorDir;
- private boolean setTime;
-
- public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.errorDir = errorDir;
- this.setTime = setTime;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- if (master.onlineTabletServers().size() == 0)
- return 500;
- return 0;
- }
-
- private static synchronized ExecutorService getThreadPool(Master master) {
- if (threadPool == null) {
- int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
- ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
- pool.allowCoreThreadTimeOut(true);
- threadPool = new TraceExecutorService(pool);
- }
- return threadPool;
- }
-
- @Override
- public Repo<Master> call(final long tid, final Master master) throws Exception {
- ExecutorService executor = getThreadPool(master);
- final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
- VolumeManager fs = master.getFileSystem();
- List<FileStatus> files = new ArrayList<FileStatus>();
- for (FileStatus entry : fs.listStatus(new Path(bulk))) {
- files.add(entry);
- }
- log.debug("tid " + tid + " importing " + files.size() + " files");
-
- Path writable = new Path(this.errorDir, ".iswritable");
- if (!fs.createNewFile(writable)) {
- // Maybe this is a re-try... clear the flag and try again
- fs.delete(writable);
- if (!fs.createNewFile(writable))
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
- "Unable to write to " + this.errorDir);
- }
- fs.delete(writable);
-
- final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
- for (FileStatus f : files)
- filesToLoad.add(f.getPath().toString());
-
- final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
- for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
- List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
- if (master.onlineTabletServers().size() == 0)
- log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
- while (master.onlineTabletServers().size() == 0) {
- UtilWaitThread.sleep(500);
- }
-
- // Use the threadpool to assign files one-at-a-time to the server
- final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
- final Random random = new Random();
- for (final String file : filesToLoad) {
- results.add(executor.submit(new Callable<List<String>>() {
- @Override
- public List<String> call() {
- List<String> failures = new ArrayList<String>();
- Client client = null;
- String server = null;
- try {
- // get a connection to a random tablet server, do not prefer cached connections because
- // this is running on the master and there are lots of connections to tablet servers
- // serving the metadata tablets
- long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]);
- server = servers[random.nextInt(servers.length)].getLocation().toString();
- client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis);
- List<String> attempt = Collections.singletonList(file);
- log.debug("Asking " + server + " to bulk import " + file);
- List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
- errorDir, setTime);
- if (fail.isEmpty()) {
- loaded.add(file);
- } else {
- failures.addAll(fail);
- }
- } catch (Exception ex) {
- log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
- } finally {
- if (client != null) {
- ThriftUtil.returnClient(client);
- }
- }
- return failures;
- }
- }));
- }
- Set<String> failures = new HashSet<String>();
- for (Future<List<String>> f : results)
- failures.addAll(f.get());
- filesToLoad.removeAll(loaded);
- if (filesToLoad.size() > 0) {
- log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
- UtilWaitThread.sleep(100);
- }
- }
-
- FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
- try {
- for (String f : filesToLoad) {
- out.write(f);
- out.write("\n");
- }
- } finally {
- out.close();
- }
-
- // return the next step, which will perform cleanup
- return new CompleteBulkImport(tableId, source, bulk, errorDir);
- }
-
- static String sampleList(Collection<?> potentiallyLongList, int max) {
- StringBuffer result = new StringBuffer();
- result.append("[");
- int i = 0;
- for (Object obj : potentiallyLongList) {
- result.append(obj);
- if (i >= max) {
- result.append("...");
- break;
- } else {
- result.append(", ");
- }
- i++;
- }
- if (i < max)
- result.delete(result.length() - 2, result.length());
- result.append("]");
- return result.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index 4a56c6f,0000000..a80e1ff
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -1,209 -1,0 +1,213 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
++import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
- import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
- import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.trace.Tracer;
- import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
++import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.htrace.wrappers.TraceExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
++import com.google.common.net.HostAndPort;
++
+class LoadFiles extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private static ExecutorService threadPool = null;
+ private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
+
+ private String tableId;
+ private String source;
+ private String bulk;
+ private String errorDir;
+ private boolean setTime;
+
+ public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
+ this.tableId = tableId;
+ this.source = source;
+ this.bulk = bulk;
+ this.errorDir = errorDir;
+ this.setTime = setTime;
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+ if (master.onlineTabletServers().size() == 0)
+ return 500;
+ return 0;
+ }
+
+ private static synchronized ExecutorService getThreadPool(Master master) {
+ if (threadPool == null) {
+ int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+ ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
+ pool.allowCoreThreadTimeOut(true);
+ threadPool = new TraceExecutorService(pool);
+ }
+ return threadPool;
+ }
+
+ @Override
+ public Repo<Master> call(final long tid, final Master master) throws Exception {
+ ExecutorService executor = getThreadPool(master);
+ final AccumuloConfiguration conf = master.getConfiguration();
+ VolumeManager fs = master.getFileSystem();
+ List<FileStatus> files = new ArrayList<FileStatus>();
+ for (FileStatus entry : fs.listStatus(new Path(bulk))) {
+ files.add(entry);
+ }
+ log.debug("tid " + tid + " importing " + files.size() + " files");
+
+ Path writable = new Path(this.errorDir, ".iswritable");
+ if (!fs.createNewFile(writable)) {
+ // Maybe this is a re-try... clear the flag and try again
+ fs.delete(writable);
+ if (!fs.createNewFile(writable))
+ throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
+ "Unable to write to " + this.errorDir);
+ }
+ fs.delete(writable);
+
+ final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
+ for (FileStatus f : files)
+ filesToLoad.add(f.getPath().toString());
+
+ final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
+ for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
+ List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
+
+ if (master.onlineTabletServers().size() == 0)
+ log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
+
+ while (master.onlineTabletServers().size() == 0) {
+ UtilWaitThread.sleep(500);
+ }
+
+ // Use the threadpool to assign files one-at-a-time to the server
+ final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
++ final Random random = new Random();
+ for (final String file : filesToLoad) {
+ results.add(executor.submit(new Callable<List<String>>() {
+ @Override
+ public List<String> call() {
+ List<String> failures = new ArrayList<String>();
+ ClientService.Client client = null;
- String server = null;
++ HostAndPort server = null;
+ try {
+ // get a connection to a random tablet server, do not prefer cached connections because
+ // this is running on the master and there are lots of connections to tablet servers
+ // serving the metadata tablets
+ long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
- client = pair.getSecond();
- server = pair.getFirst();
++ // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
++ TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
++ server = servers[random.nextInt(servers.length)].getLocation();
++ client = ThriftUtil.getTServerClient(server, master, timeInMillis);
+ List<String> attempt = Collections.singletonList(file);
- log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
++ log.debug("Asking " + server + " to bulk import " + file);
+ List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
+ if (fail.isEmpty()) {
+ loaded.add(file);
+ } else {
+ failures.addAll(fail);
+ }
+ } catch (Exception ex) {
+ log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
+ } finally {
- ServerClient.close(client);
++ ThriftUtil.returnClient(client);
+ }
+ return failures;
+ }
+ }));
+ }
+ Set<String> failures = new HashSet<String>();
+ for (Future<List<String>> f : results)
+ failures.addAll(f.get());
+ filesToLoad.removeAll(loaded);
+ if (filesToLoad.size() > 0) {
+ log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
+ UtilWaitThread.sleep(100);
+ }
+ }
+
+ FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
+ BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
+ try {
+ for (String f : filesToLoad) {
+ out.write(f);
+ out.write("\n");
+ }
+ } finally {
+ out.close();
+ }
+
+ // return the next step, which will perform cleanup
+ return new CompleteBulkImport(tableId, source, bulk, errorDir);
+ }
+
+ static String sampleList(Collection<?> potentiallyLongList, int max) {
+ StringBuffer result = new StringBuffer();
+ result.append("[");
+ int i = 0;
+ for (Object obj : potentiallyLongList) {
+ result.append(obj);
+ if (i >= max) {
+ result.append("...");
+ break;
+ } else {
+ result.append(", ");
+ }
+ i++;
+ }
+ if (i < max)
+ result.delete(result.length() - 2, result.length());
+ result.append("]");
+ return result.toString();
+ }
+
- }
++}