You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/31 20:30:42 UTC
svn commit: r1379543 -
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Author: mbautin
Date: Fri Aug 31 18:30:42 2012
New Revision: 1379543
URL: http://svn.apache.org/viewvc?rev=1379543&view=rev
Log:
[0.89-fb] [HBASE-6605] Allow bulk load to go past failures.
Author: aaiyer
Summary:
Bulk load currently fails immediately whenever one file in the
bulk load directory is not successfully bulk loaded.
Allow for this behavior to be controlled through a conf change
Test Plan: test out manually on dev cluster
Reviewers: madhuvaidya, kannan, kranganathan
Reviewed By: kannan
CC: ruifang, hbase-eng@
Differential Revision: https://phabricator.fb.com/D548896
Task ID: 660885
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1379543&r1=1379542&r2=1379543&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Aug 31 18:30:42 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Map;
@@ -70,9 +71,13 @@ public class LoadIncrementalHFiles exten
private boolean assignSeqIds;
+ public static String EXIT_ON_FIRST_FAILURE = "hbase.mapreduce.bulkload.failure.exitOnFirst";
+ private boolean exitOnFirstFailure;
+
public LoadIncrementalHFiles(Configuration conf) {
super(conf);
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
+ exitOnFirstFailure = conf.getBoolean(EXIT_ON_FIRST_FAILURE, true);
}
public LoadIncrementalHFiles() {
@@ -161,18 +166,36 @@ public class LoadIncrementalHFiles exten
}
Deque<LoadQueueItem> queue = null;
+ ArrayList<LoadQueueItem> failedItems = new ArrayList<LoadQueueItem>();
try {
queue = discoverLoadQueue(hfofDir);
while (!queue.isEmpty()) {
LoadQueueItem item = queue.remove();
- tryLoad(item, conn, table.getTableName(), queue);
+ try {
+ tryLoad(item, conn, table.getTableName(), queue);
+ } catch (IOException e) {
+ LOG.error("Caught exception while processing " + item.hfilePath, e);
+
+ if (exitOnFirstFailure) throw e;
+ // otherwise lets keep quiet and try the next item
+ failedItems.add(item);
+ }
}
} finally {
- if (queue != null && !queue.isEmpty()) {
+ if (!failedItems.isEmpty()
+ || (queue != null && !queue.isEmpty())) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
err.append("-------------------------------------------------\n");
+ err.append("Had errors on:\n");
+ err.append("-------------------------------------------------\n");
+ for (LoadQueueItem q : failedItems) {
+ err.append(" ").append(q.hfilePath).append('\n');
+ }
+ err.append("-------------------------------------------------\n");
+ err.append("Did not try:\n");
+ err.append("-------------------------------------------------\n");
for (LoadQueueItem q : queue) {
err.append(" ").append(q.hfilePath).append('\n');
}
@@ -341,4 +364,4 @@ public class LoadIncrementalHFiles exten
System.exit(ret);
}
-}
\ No newline at end of file
+}