You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bh...@apache.org on 2014/06/18 23:32:55 UTC
[1/3] git commit: ACCUMULO-2854 Added additional check to prevent NPE
Repository: accumulo
Updated Branches:
refs/heads/1.6.1-SNAPSHOT 5c409b0ad -> 079ef51c7
refs/heads/master ffd2626a8 -> 8efcbd8f8
ACCUMULO-2854 Added additional check to prevent NPE
Signed-off-by: Bill Havanki <bh...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/079ef51c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/079ef51c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/079ef51c
Branch: refs/heads/1.6.1-SNAPSHOT
Commit: 079ef51c7c254f1f7bd7bd4d83ea405ae635b433
Parents: 5c409b0
Author: Jeffrey S. Schwartz <je...@schwartech.com>
Authored: Thu Jun 12 20:07:54 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Wed Jun 18 17:07:38 2014 -0400
----------------------------------------------------------------------
.../accumulo/master/tableOps/BulkImport.java | 150 ++++++++++---------
1 file changed, 76 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/079ef51c/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index bdc89dd..e42fee6 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -106,28 +106,28 @@ import org.apache.thrift.TException;
public class BulkImport extends MasterRepo {
public static final String FAILURES_TXT = "failures.txt";
-
+
private static final long serialVersionUID = 1L;
-
+
private static final Logger log = Logger.getLogger(BulkImport.class);
-
+
private String tableId;
private String sourceDir;
private String errorDir;
private boolean setTime;
-
+
public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
this.tableId = tableId;
this.sourceDir = sourceDir;
this.errorDir = errorDir;
this.setTime = setTime;
}
-
+
@Override
public long isReady(long tid, Master master) throws Exception {
if (!Utils.getReadLock(tableId, tid).tryLock())
return 100;
-
+
Instance instance = HdfsZooInstance.getInstance();
Tables.clearCache(instance);
if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
@@ -140,18 +140,18 @@ public class BulkImport extends MasterRepo {
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
}
}
-
+
@Override
//TODO Remove deprecation warning suppression when Hadoop1 support is dropped
@SuppressWarnings("deprecation")
public Repo<Master> call(long tid, Master master) throws Exception {
log.debug(" tid " + tid + " sourceDir " + sourceDir);
-
+
Utils.getReadLock(tableId, tid).lock();
-
+
// check that the error directory exists and is empty
VolumeManager fs = master.getFileSystem();
-
+
Path errorPath = new Path(errorDir);
FileStatus errorStatus = null;
try {
@@ -168,9 +168,9 @@ public class BulkImport extends MasterRepo {
if (fs.listStatus(errorPath).length != 0)
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+ " is not empty");
-
+
ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
-
+
// move the files into the directory
try {
String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
@@ -182,24 +182,26 @@ public class BulkImport extends MasterRepo {
+ ex);
}
}
-
+
private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
-
- String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString();
-
+ Path tempPath = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs());
+ if (tempPath == null)
+ throw new IllegalStateException(sourceDir + " is not in a known namespace");
+
+ String tableDir = tempPath.toString();
if (tableDir == null)
throw new IllegalStateException(sourceDir + " is not in a known namespace");
Path directory = new Path(tableDir + "/" + tableId);
fs.mkdirs(directory);
-
+
// only one should be able to create the lock file
// the purpose of the lock file is to avoid a race
// condition between the call to fs.exists() and
// fs.mkdirs()... if only hadoop had a mkdir() function
// that failed when the dir existed
-
+
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-
+
while (true) {
Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
if (fs.exists(newBulkDir)) // sanity check
@@ -207,7 +209,7 @@ public class BulkImport extends MasterRepo {
if (fs.mkdirs(newBulkDir))
return newBulkDir;
log.warn("Failed to create " + newBulkDir + " for unknown reason");
-
+
UtilWaitThread.sleep(3000);
}
}
@@ -216,20 +218,20 @@ public class BulkImport extends MasterRepo {
@SuppressWarnings("deprecation")
private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
Path bulkDir = createNewBulkDir(fs, tableId);
-
+
MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-
+
Path dirPath = new Path(dir);
FileStatus[] mapFiles = fs.listStatus(dirPath);
-
+
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-
+
for (FileStatus fileStatus : mapFiles) {
String sa[] = fileStatus.getPath().getName().split("\\.");
String extension = "";
if (sa.length > 1) {
extension = sa[sa.length - 1];
-
+
if (!FileOperations.getValidExtensions().contains(extension)) {
log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
continue;
@@ -238,13 +240,13 @@ public class BulkImport extends MasterRepo {
// assume it is a map file
extension = Constants.MAPFILE_EXTENSION;
}
-
+
if (extension.equals(Constants.MAPFILE_EXTENSION)) {
if (!fileStatus.isDir()) {
log.warn(fileStatus.getPath() + " is not a map file, ignoring");
continue;
}
-
+
if (fileStatus.getPath().getName().equals("_logs")) {
log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
continue;
@@ -260,7 +262,7 @@ public class BulkImport extends MasterRepo {
continue;
}
}
-
+
String newName = "I" + namer.getNextName() + "." + extension;
Path newPath = new Path(bulkDir, newName);
try {
@@ -272,7 +274,7 @@ public class BulkImport extends MasterRepo {
}
return bulkDir.toString();
}
-
+
@Override
public void undo(long tid, Master environment) throws Exception {
// unreserve source/error directories
@@ -283,23 +285,23 @@ public class BulkImport extends MasterRepo {
}
class CleanUpBulkImport extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
-
+
private String tableId;
private String source;
private String bulk;
private String error;
-
+
public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
this.tableId = tableId;
this.source = source;
this.bulk = bulk;
this.error = error;
}
-
+
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
log.debug("removing the bulk processing flag file in " + bulk);
@@ -320,21 +322,21 @@ class CleanUpBulkImport extends MasterRepo {
}
class CompleteBulkImport extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private String tableId;
private String source;
private String bulk;
private String error;
-
+
public CompleteBulkImport(String tableId, String source, String bulk, String error) {
this.tableId = tableId;
this.source = source;
this.bulk = bulk;
this.error = error;
}
-
+
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
@@ -343,21 +345,21 @@ class CompleteBulkImport extends MasterRepo {
}
class CopyFailed extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private String tableId;
private String source;
private String bulk;
private String error;
-
+
public CopyFailed(String tableId, String source, String bulk, String error) {
this.tableId = tableId;
this.source = source;
this.bulk = bulk;
this.error = error;
}
-
+
@Override
public long isReady(long tid, Master master) throws Exception {
Set<TServerInstance> finished = new HashSet<TServerInstance>();
@@ -375,19 +377,19 @@ class CopyFailed extends MasterRepo {
return 0;
return 500;
}
-
+
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
// This needs to execute after the arbiter is stopped
-
+
VolumeManager fs = master.getFileSystem();
-
+
if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
return new CleanUpBulkImport(tableId, source, bulk, error);
-
+
HashMap<String,String> failures = new HashMap<String,String>();
HashMap<String,String> loadedFailures = new HashMap<String,String>();
-
+
FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
try {
@@ -400,18 +402,18 @@ class CopyFailed extends MasterRepo {
} finally {
failFile.close();
}
-
+
/*
* I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
* have no loaded markers.
*/
-
+
// determine which failed files were loaded
Connector conn = master.getConnector();
Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
+
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
String loadedFile = entry.getKey().getColumnQualifier().toString();
@@ -421,7 +423,7 @@ class CopyFailed extends MasterRepo {
}
}
}
-
+
// move failed files that were not loaded
for (String failure : failures.values()) {
Path orig = new Path(failure);
@@ -429,47 +431,47 @@ class CopyFailed extends MasterRepo {
fs.rename(orig, dest);
log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
}
-
+
if (loadedFailures.size() > 0) {
DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZBULK_FAILED_COPYQ);
-
+
HashSet<String> workIds = new HashSet<String>();
-
+
for (String failure : loadedFailures.values()) {
Path orig = new Path(failure);
Path dest = new Path(error, orig.getName());
-
+
if (fs.exists(dest))
continue;
-
+
bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
workIds.add(orig.getName());
log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
}
-
+
bifCopyQueue.waitUntilDone(workIds);
}
-
+
fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
return new CleanUpBulkImport(tableId, source, bulk, error);
}
-
+
}
class LoadFiles extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private static ExecutorService threadPool = null;
private static final Logger log = Logger.getLogger(BulkImport.class);
-
+
private String tableId;
private String source;
private String bulk;
private String errorDir;
private boolean setTime;
-
+
public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
this.tableId = tableId;
this.source = source;
@@ -477,7 +479,7 @@ class LoadFiles extends MasterRepo {
this.errorDir = errorDir;
this.setTime = setTime;
}
-
+
@Override
public long isReady(long tid, Master master) throws Exception {
if (master.onlineTabletServers().size() == 0)
@@ -505,7 +507,7 @@ class LoadFiles extends MasterRepo {
files.add(entry);
}
log.debug("tid " + tid + " importing " + files.size() + " files");
-
+
Path writable = new Path(this.errorDir, ".iswritable");
if (!fs.createNewFile(writable)) {
// Maybe this is a re-try... clear the flag and try again
@@ -515,22 +517,22 @@ class LoadFiles extends MasterRepo {
"Unable to write to " + this.errorDir);
}
fs.delete(writable);
-
+
final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
for (FileStatus f : files)
filesToLoad.add(f.getPath().toString());
-
+
final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
+
if (master.onlineTabletServers().size() == 0)
log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
+
while (master.onlineTabletServers().size() == 0) {
UtilWaitThread.sleep(500);
}
-
+
// Use the threadpool to assign files one-at-a-time to the server
final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
for (final String file : filesToLoad) {
@@ -575,7 +577,7 @@ class LoadFiles extends MasterRepo {
UtilWaitThread.sleep(100);
}
}
-
+
FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
try {
@@ -586,11 +588,11 @@ class LoadFiles extends MasterRepo {
} finally {
out.close();
}
-
+
// return the next step, which will perform cleanup
return new CompleteBulkImport(tableId, source, bulk, errorDir);
}
-
+
static String sampleList(Collection<?> potentiallyLongList, int max) {
StringBuffer result = new StringBuffer();
result.append("[");
@@ -610,5 +612,5 @@ class LoadFiles extends MasterRepo {
result.append("]");
return result.toString();
}
-
+
}
[2/3] git commit: ACCUMULO-2854 Added additional check to prevent NPE
Posted by bh...@apache.org.
ACCUMULO-2854 Added additional check to prevent NPE
Signed-off-by: Bill Havanki <bh...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/079ef51c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/079ef51c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/079ef51c
Branch: refs/heads/master
Commit: 079ef51c7c254f1f7bd7bd4d83ea405ae635b433
Parents: 5c409b0
Author: Jeffrey S. Schwartz <je...@schwartech.com>
Authored: Thu Jun 12 20:07:54 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Wed Jun 18 17:07:38 2014 -0400
----------------------------------------------------------------------
.../accumulo/master/tableOps/BulkImport.java | 150 ++++++++++---------
1 file changed, 76 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/079ef51c/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index bdc89dd..e42fee6 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -106,28 +106,28 @@ import org.apache.thrift.TException;
public class BulkImport extends MasterRepo {
public static final String FAILURES_TXT = "failures.txt";
-
+
private static final long serialVersionUID = 1L;
-
+
private static final Logger log = Logger.getLogger(BulkImport.class);
-
+
private String tableId;
private String sourceDir;
private String errorDir;
private boolean setTime;
-
+
public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
this.tableId = tableId;
this.sourceDir = sourceDir;
this.errorDir = errorDir;
this.setTime = setTime;
}
-
+
@Override
public long isReady(long tid, Master master) throws Exception {
if (!Utils.getReadLock(tableId, tid).tryLock())
return 100;
-
+
Instance instance = HdfsZooInstance.getInstance();
Tables.clearCache(instance);
if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
@@ -140,18 +140,18 @@ public class BulkImport extends MasterRepo {
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
}
}
-
+
@Override
//TODO Remove deprecation warning suppression when Hadoop1 support is dropped
@SuppressWarnings("deprecation")
public Repo<Master> call(long tid, Master master) throws Exception {
log.debug(" tid " + tid + " sourceDir " + sourceDir);
-
+
Utils.getReadLock(tableId, tid).lock();
-
+
// check that the error directory exists and is empty
VolumeManager fs = master.getFileSystem();
-
+
Path errorPath = new Path(errorDir);
FileStatus errorStatus = null;
try {
@@ -168,9 +168,9 @@ public class BulkImport extends MasterRepo {
if (fs.listStatus(errorPath).length != 0)
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
+ " is not empty");
-
+
ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
-
+
// move the files into the directory
try {
String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
@@ -182,24 +182,26 @@ public class BulkImport extends MasterRepo {
+ ex);
}
}
-
+
private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
-
- String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString();
-
+ Path tempPath = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs());
+ if (tempPath == null)
+ throw new IllegalStateException(sourceDir + " is not in a known namespace");
+
+ String tableDir = tempPath.toString();
if (tableDir == null)
throw new IllegalStateException(sourceDir + " is not in a known namespace");
Path directory = new Path(tableDir + "/" + tableId);
fs.mkdirs(directory);
-
+
// only one should be able to create the lock file
// the purpose of the lock file is to avoid a race
// condition between the call to fs.exists() and
// fs.mkdirs()... if only hadoop had a mkdir() function
// that failed when the dir existed
-
+
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-
+
while (true) {
Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
if (fs.exists(newBulkDir)) // sanity check
@@ -207,7 +209,7 @@ public class BulkImport extends MasterRepo {
if (fs.mkdirs(newBulkDir))
return newBulkDir;
log.warn("Failed to create " + newBulkDir + " for unknown reason");
-
+
UtilWaitThread.sleep(3000);
}
}
@@ -216,20 +218,20 @@ public class BulkImport extends MasterRepo {
@SuppressWarnings("deprecation")
private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
Path bulkDir = createNewBulkDir(fs, tableId);
-
+
MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-
+
Path dirPath = new Path(dir);
FileStatus[] mapFiles = fs.listStatus(dirPath);
-
+
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-
+
for (FileStatus fileStatus : mapFiles) {
String sa[] = fileStatus.getPath().getName().split("\\.");
String extension = "";
if (sa.length > 1) {
extension = sa[sa.length - 1];
-
+
if (!FileOperations.getValidExtensions().contains(extension)) {
log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
continue;
@@ -238,13 +240,13 @@ public class BulkImport extends MasterRepo {
// assume it is a map file
extension = Constants.MAPFILE_EXTENSION;
}
-
+
if (extension.equals(Constants.MAPFILE_EXTENSION)) {
if (!fileStatus.isDir()) {
log.warn(fileStatus.getPath() + " is not a map file, ignoring");
continue;
}
-
+
if (fileStatus.getPath().getName().equals("_logs")) {
log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
continue;
@@ -260,7 +262,7 @@ public class BulkImport extends MasterRepo {
continue;
}
}
-
+
String newName = "I" + namer.getNextName() + "." + extension;
Path newPath = new Path(bulkDir, newName);
try {
@@ -272,7 +274,7 @@ public class BulkImport extends MasterRepo {
}
return bulkDir.toString();
}
-
+
@Override
public void undo(long tid, Master environment) throws Exception {
// unreserve source/error directories
@@ -283,23 +285,23 @@ public class BulkImport extends MasterRepo {
}
class CleanUpBulkImport extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
-
+
private String tableId;
private String source;
private String bulk;
private String error;
-
+
public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
this.tableId = tableId;
this.source = source;
this.bulk = bulk;
this.error = error;
}
-
+
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
log.debug("removing the bulk processing flag file in " + bulk);
@@ -320,21 +322,21 @@ class CleanUpBulkImport extends MasterRepo {
}
class CompleteBulkImport extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private String tableId;
private String source;
private String bulk;
private String error;
-
+
public CompleteBulkImport(String tableId, String source, String bulk, String error) {
this.tableId = tableId;
this.source = source;
this.bulk = bulk;
this.error = error;
}
-
+
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
@@ -343,21 +345,21 @@ class CompleteBulkImport extends MasterRepo {
}
class CopyFailed extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private String tableId;
private String source;
private String bulk;
private String error;
-
+
public CopyFailed(String tableId, String source, String bulk, String error) {
this.tableId = tableId;
this.source = source;
this.bulk = bulk;
this.error = error;
}
-
+
@Override
public long isReady(long tid, Master master) throws Exception {
Set<TServerInstance> finished = new HashSet<TServerInstance>();
@@ -375,19 +377,19 @@ class CopyFailed extends MasterRepo {
return 0;
return 500;
}
-
+
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
// This needs to execute after the arbiter is stopped
-
+
VolumeManager fs = master.getFileSystem();
-
+
if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
return new CleanUpBulkImport(tableId, source, bulk, error);
-
+
HashMap<String,String> failures = new HashMap<String,String>();
HashMap<String,String> loadedFailures = new HashMap<String,String>();
-
+
FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
try {
@@ -400,18 +402,18 @@ class CopyFailed extends MasterRepo {
} finally {
failFile.close();
}
-
+
/*
* I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
* have no loaded markers.
*/
-
+
// determine which failed files were loaded
Connector conn = master.getConnector();
Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
+
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
String loadedFile = entry.getKey().getColumnQualifier().toString();
@@ -421,7 +423,7 @@ class CopyFailed extends MasterRepo {
}
}
}
-
+
// move failed files that were not loaded
for (String failure : failures.values()) {
Path orig = new Path(failure);
@@ -429,47 +431,47 @@ class CopyFailed extends MasterRepo {
fs.rename(orig, dest);
log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
}
-
+
if (loadedFailures.size() > 0) {
DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZBULK_FAILED_COPYQ);
-
+
HashSet<String> workIds = new HashSet<String>();
-
+
for (String failure : loadedFailures.values()) {
Path orig = new Path(failure);
Path dest = new Path(error, orig.getName());
-
+
if (fs.exists(dest))
continue;
-
+
bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
workIds.add(orig.getName());
log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
}
-
+
bifCopyQueue.waitUntilDone(workIds);
}
-
+
fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
return new CleanUpBulkImport(tableId, source, bulk, error);
}
-
+
}
class LoadFiles extends MasterRepo {
-
+
private static final long serialVersionUID = 1L;
-
+
private static ExecutorService threadPool = null;
private static final Logger log = Logger.getLogger(BulkImport.class);
-
+
private String tableId;
private String source;
private String bulk;
private String errorDir;
private boolean setTime;
-
+
public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
this.tableId = tableId;
this.source = source;
@@ -477,7 +479,7 @@ class LoadFiles extends MasterRepo {
this.errorDir = errorDir;
this.setTime = setTime;
}
-
+
@Override
public long isReady(long tid, Master master) throws Exception {
if (master.onlineTabletServers().size() == 0)
@@ -505,7 +507,7 @@ class LoadFiles extends MasterRepo {
files.add(entry);
}
log.debug("tid " + tid + " importing " + files.size() + " files");
-
+
Path writable = new Path(this.errorDir, ".iswritable");
if (!fs.createNewFile(writable)) {
// Maybe this is a re-try... clear the flag and try again
@@ -515,22 +517,22 @@ class LoadFiles extends MasterRepo {
"Unable to write to " + this.errorDir);
}
fs.delete(writable);
-
+
final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
for (FileStatus f : files)
filesToLoad.add(f.getPath().toString());
-
+
final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
+
if (master.onlineTabletServers().size() == 0)
log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
+
while (master.onlineTabletServers().size() == 0) {
UtilWaitThread.sleep(500);
}
-
+
// Use the threadpool to assign files one-at-a-time to the server
final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
for (final String file : filesToLoad) {
@@ -575,7 +577,7 @@ class LoadFiles extends MasterRepo {
UtilWaitThread.sleep(100);
}
}
-
+
FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
try {
@@ -586,11 +588,11 @@ class LoadFiles extends MasterRepo {
} finally {
out.close();
}
-
+
// return the next step, which will perform cleanup
return new CompleteBulkImport(tableId, source, bulk, errorDir);
}
-
+
static String sampleList(Collection<?> potentiallyLongList, int max) {
StringBuffer result = new StringBuffer();
result.append("[");
@@ -610,5 +612,5 @@ class LoadFiles extends MasterRepo {
result.append("]");
return result.toString();
}
-
+
}
[3/3] git commit: Merge branch '1.6.1-SNAPSHOT'
Posted by bh...@apache.org.
Merge branch '1.6.1-SNAPSHOT'
Conflicts:
server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8efcbd8f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8efcbd8f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8efcbd8f
Branch: refs/heads/master
Commit: 8efcbd8f829d49cfc9e37b147d94354e37f134f8
Parents: ffd2626 079ef51
Author: Bill Havanki <bh...@cloudera.com>
Authored: Wed Jun 18 17:27:52 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Wed Jun 18 17:27:52 2014 -0400
----------------------------------------------------------------------
.../accumulo/master/tableOps/BulkImport.java | 150 ++++++++++---------
1 file changed, 76 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efcbd8f/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 06ae7aa,e42fee6..73e0e49
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -380,17 -381,17 +382,17 @@@ class CopyFailed extends MasterRepo
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
// This needs to execute after the arbiter is stopped
-
+
VolumeManager fs = master.getFileSystem();
-
+
if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
return new CleanUpBulkImport(tableId, source, bulk, error);
-
+
HashMap<String,String> failures = new HashMap<String,String>();
HashMap<String,String> loadedFailures = new HashMap<String,String>();
-
+
FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
- BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
+ BufferedReader in = new BufferedReader(new InputStreamReader(failFile, StandardCharsets.UTF_8));
try {
String line = null;
while ((line = in.readLine()) != null) {
@@@ -430,21 -431,21 +432,21 @@@
fs.rename(orig, dest);
log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
}
-
+
if (loadedFailures.size() > 0) {
DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
- + Constants.ZBULK_FAILED_COPYQ);
+ + Constants.ZBULK_FAILED_COPYQ, master.getConfiguration().getConfiguration());
-
+
HashSet<String> workIds = new HashSet<String>();
-
+
for (String failure : loadedFailures.values()) {
Path orig = new Path(failure);
Path dest = new Path(error, orig.getName());
-
+
if (fs.exists(dest))
continue;
-
+
- bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
+ bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(StandardCharsets.UTF_8));
workIds.add(orig.getName());
log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
}
@@@ -576,9 -577,9 +578,9 @@@ class LoadFiles extends MasterRepo
UtilWaitThread.sleep(100);
}
}
-
+
FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
+ BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, StandardCharsets.UTF_8));
try {
for (String f : filesToLoad) {
out.write(f);