You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/10/13 22:55:38 UTC
[2/3] accumulo git commit: Merge branch '1.6' into 1.7
Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/91b161a9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/91b161a9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/91b161a9
Branch: refs/heads/master
Commit: 91b161a932307e2d16845a0c0f6304f123a140b9
Parents: 62821a0 4deaf73
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue Oct 13 16:50:18 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue Oct 13 16:50:18 2015 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/master/tableOps/BulkImport.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/91b161a9/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 031a80c,e661968..ad20473
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -263,5 -284,337 +263,6 @@@ public class BulkImport extends MasterR
Utils.unreserveHdfsDirectory(sourceDir, tid);
Utils.unreserveHdfsDirectory(errorDir, tid);
Utils.getReadLock(tableId, tid).unlock();
+ ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
}
}
-
-class CleanUpBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- log.debug("removing the bulk processing flag file in " + bulk);
- Path bulkDir = new Path(bulk);
- MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
- MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
- log.debug("removing the metadata table markers for loaded files");
- Connector conn = master.getConnector();
- MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
- log.debug("releasing HDFS reservations for " + source + " and " + error);
- Utils.unreserveHdfsDirectory(source, tid);
- Utils.unreserveHdfsDirectory(error, tid);
- Utils.getReadLock(tableId, tid).unlock();
- log.debug("completing bulk import transaction " + tid);
- ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
- return null;
- }
-}
-
-class CompleteBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CompleteBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
- return new CopyFailed(tableId, source, bulk, error);
- }
-}
-
-class CopyFailed extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CopyFailed(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- Set<TServerInstance> finished = new HashSet<TServerInstance>();
- Set<TServerInstance> running = master.onlineTabletServers();
- for (TServerInstance server : running) {
- try {
- TServerConnection client = master.getConnection(server);
- if (client != null && !client.isActive(tid))
- finished.add(server);
- } catch (TException ex) {
- log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
- }
- }
- if (finished.containsAll(running))
- return 0;
- return 500;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // This needs to execute after the arbiter is stopped
-
- VolumeManager fs = master.getFileSystem();
-
- if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
- return new CleanUpBulkImport(tableId, source, bulk, error);
-
- HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
- HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
-
- FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
- BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8));
- try {
- String line = null;
- while ((line = in.readLine()) != null) {
- Path path = new Path(line);
- if (!fs.exists(new Path(error, path.getName())))
- failures.put(new FileRef(line, path), line);
- }
- } finally {
- failFile.close();
- }
-
- /*
- * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
- * have no loaded markers.
- */
-
- // determine which failed files were loaded
- Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
- for (Entry<Key,Value> entry : mscanner) {
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- FileRef loadedFile = new FileRef(fs, entry.getKey());
- String absPath = failures.remove(loadedFile);
- if (absPath != null) {
- loadedFailures.put(loadedFile, absPath);
- }
- }
- }
-
- // move failed files that were not loaded
- for (String failure : failures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
- fs.rename(orig, dest);
- log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
- }
-
- if (loadedFailures.size() > 0) {
- DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
- + Constants.ZBULK_FAILED_COPYQ);
-
- HashSet<String> workIds = new HashSet<String>();
-
- for (String failure : loadedFailures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
-
- if (fs.exists(dest))
- continue;
-
- bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
- workIds.add(orig.getName());
- log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
- }
-
- bifCopyQueue.waitUntilDone(workIds);
- }
-
- fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
- return new CleanUpBulkImport(tableId, source, bulk, error);
- }
-
-}
-
-class LoadFiles extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static ExecutorService threadPool = null;
- private static final Logger log = Logger.getLogger(BulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String errorDir;
- private boolean setTime;
-
- public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.errorDir = errorDir;
- this.setTime = setTime;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- if (master.onlineTabletServers().size() == 0)
- return 500;
- return 0;
- }
-
- private static synchronized ExecutorService getThreadPool(Master master) {
- if (threadPool == null) {
- int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
- ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
- pool.allowCoreThreadTimeOut(true);
- threadPool = new TraceExecutorService(pool);
- }
- return threadPool;
- }
-
- @Override
- public Repo<Master> call(final long tid, final Master master) throws Exception {
- ExecutorService executor = getThreadPool(master);
- final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
- VolumeManager fs = master.getFileSystem();
- List<FileStatus> files = new ArrayList<FileStatus>();
- for (FileStatus entry : fs.listStatus(new Path(bulk))) {
- files.add(entry);
- }
- log.debug("tid " + tid + " importing " + files.size() + " files");
-
- Path writable = new Path(this.errorDir, ".iswritable");
- if (!fs.createNewFile(writable)) {
- // Maybe this is a re-try... clear the flag and try again
- fs.delete(writable);
- if (!fs.createNewFile(writable))
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
- "Unable to write to " + this.errorDir);
- }
- fs.delete(writable);
-
- final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
- for (FileStatus f : files)
- filesToLoad.add(f.getPath().toString());
-
- final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
- for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
- List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
- if (master.onlineTabletServers().size() == 0)
- log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
- while (master.onlineTabletServers().size() == 0) {
- UtilWaitThread.sleep(500);
- }
-
- // Use the threadpool to assign files one-at-a-time to the server
- final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
- for (final String file : filesToLoad) {
- results.add(executor.submit(new Callable<List<String>>() {
- @Override
- public List<String> call() {
- List<String> failures = new ArrayList<String>();
- ClientService.Client client = null;
- String server = null;
- try {
- // get a connection to a random tablet server, do not prefer cached connections because
- // this is running on the master and there are lots of connections to tablet servers
- // serving the metadata tablets
- long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- Pair<String,Client> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis);
- client = pair.getSecond();
- server = pair.getFirst();
- List<String> attempt = Collections.singletonList(file);
- log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
- List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
- errorDir, setTime);
- if (fail.isEmpty()) {
- loaded.add(file);
- } else {
- failures.addAll(fail);
- }
- } catch (Exception ex) {
- log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
- } finally {
- ServerClient.close(client);
- }
- return failures;
- }
- }));
- }
- Set<String> failures = new HashSet<String>();
- for (Future<List<String>> f : results)
- failures.addAll(f.get());
- filesToLoad.removeAll(loaded);
- if (filesToLoad.size() > 0) {
- log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
- UtilWaitThread.sleep(100);
- }
- }
-
- FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
- try {
- for (String f : filesToLoad) {
- out.write(f);
- out.write("\n");
- }
- } finally {
- out.close();
- }
-
- // return the next step, which will perform cleanup
- return new CompleteBulkImport(tableId, source, bulk, errorDir);
- }
-
- static String sampleList(Collection<?> potentiallyLongList, int max) {
- StringBuffer result = new StringBuffer();
- result.append("[");
- int i = 0;
- for (Object obj : potentiallyLongList) {
- result.append(obj);
- if (i >= max) {
- result.append("...");
- break;
- } else {
- result.append(", ");
- }
- i++;
- }
- if (i < max)
- result.delete(result.length() - 2, result.length());
- result.append("]");
- return result.toString();
- }
-
-}