You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/04/24 06:49:47 UTC
svn commit: r1471250 - in /hbase/branches/0.95/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Author: stack
Date: Wed Apr 24 04:49:47 2013
New Revision: 1471250
URL: http://svn.apache.org/r1471250
Log:
HBASE-5472 LoadIncrementalHFiles loops forever if the target table misses a CF -- REAPPLY
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1471250&r1=1471249&r2=1471250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Apr 24 04:49:47 2013
@@ -187,7 +187,7 @@ public class LoadIncrementalHFiles exten
/**
* Perform a bulk load of the given directory into the given
* pre-existing table. This method is not threadsafe.
- *
+ *
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
* @param table the table to load into
@@ -220,6 +220,27 @@ public class LoadIncrementalHFiles exten
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
try {
discoverLoadQueue(queue, hfofDir);
+ // check whether there is invalid family name in HFiles to be bulkloaded
+ Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+ ArrayList<String> familyNames = new ArrayList<String>();
+ for (HColumnDescriptor family : families) {
+ familyNames.add(family.getNameAsString());
+ }
+ ArrayList<String> unmatchedFamilies = new ArrayList<String>();
+ for (LoadQueueItem lqi : queue) {
+ String familyNameInHFile = Bytes.toString(lqi.family);
+ if (!familyNames.contains(familyNameInHFile)) {
+ unmatchedFamilies.add(familyNameInHFile);
+ }
+ }
+ if (unmatchedFamilies.size() > 0) {
+ String msg =
+ "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+ + unmatchedFamilies + "; valid family names of table "
+ + Bytes.toString(table.getTableName()) + " are: " + familyNames;
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
int count = 0;
if (queue.isEmpty()) {
@@ -358,7 +379,7 @@ public class LoadIncrementalHFiles exten
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();
-
+
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
@@ -492,12 +513,12 @@ public class LoadIncrementalHFiles exten
* Attempts to do an atomic load of many hfiles into a region. If it fails,
* it returns a list of hfiles that need to be retried. If it is successful
* it will return an empty list.
- *
+ *
* NOTE: To maintain row atomicity guarantees, region server callable should
* succeed atomically and fails atomically.
- *
+ *
* Protected for testing.
- *
+ *
* @return empty list if success, list of items to retry on recoverable
* failure
*/
@@ -650,7 +671,7 @@ public class LoadIncrementalHFiles exten
private boolean doesTableExist(String tableName) throws Exception {
return hbAdmin.tableExists(tableName);
}
-
+
/*
* Infers region boundaries for a new table.
* Parameter:
@@ -658,29 +679,29 @@ public class LoadIncrementalHFiles exten
* If a key is a start key of a file, then it maps to +1
* If a key is an end key of a file, then it maps to -1
* Algo:
- * 1) Poll on the keys in order:
- * a) Keep adding the mapped values to these keys (runningSum)
+ * 1) Poll on the keys in order:
+ * a) Keep adding the mapped values to these keys (runningSum)
* b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
- * 2) Return the boundary list.
+ * 2) Return the boundary list.
*/
public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
int runningValue = 0;
byte[] currStartKey = null;
boolean firstBoundary = true;
-
+
for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
if (runningValue == 0) currStartKey = item.getKey();
runningValue += item.getValue();
if (runningValue == 0) {
if (!firstBoundary) keysArray.add(currStartKey);
firstBoundary = false;
- }
+ }
}
-
+
return keysArray.toArray(new byte[0][0]);
}
-
+
/*
* If the table is created for the first time, then "completebulkload" reads the files twice.
* More modifications necessary if we want to avoid doing it.
@@ -706,7 +727,7 @@ public class LoadIncrementalHFiles exten
// Build a set of keys
byte[][] keys;
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-
+
for (FileStatus stat : familyDirStatuses) {
if (!stat.isDir()) {
LOG.warn("Skipping non-directory " + stat.getPath());
@@ -716,10 +737,10 @@ public class LoadIncrementalHFiles exten
// Skip _logs, etc
if (familyDir.getName().startsWith("_")) continue;
byte[] family = familyDir.getName().getBytes();
-
+
hcd = new HColumnDescriptor(family);
htd.addFamily(hcd);
-
+
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
for (Path hfile : hfiles) {
if (hfile.getName().startsWith("_")) continue;
@@ -739,7 +760,7 @@ public class LoadIncrementalHFiles exten
LOG.info("Trying to figure out region boundaries hfile=" + hfile +
" first=" + Bytes.toStringBinary(first) +
" last=" + Bytes.toStringBinary(last));
-
+
// To eventually infer start key-end key boundaries
Integer value = map.containsKey(first)? map.get(first):0;
map.put(first, value+1);
@@ -751,7 +772,7 @@ public class LoadIncrementalHFiles exten
}
}
}
-
+
keys = LoadIncrementalHFiles.inferBoundaries(map);
this.hbAdmin.createTable(htd,keys);
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1471250&r1=1471249&r2=1471250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Wed Apr 24 04:49:47 2013
@@ -53,6 +53,7 @@ import org.junit.experimental.categories
public class TestLoadIncrementalHFiles {
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
private static final byte[] FAMILY = Bytes.toBytes("myfam");
+ private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
private static final byte[][] SPLIT_KEYS = new byte[][] {
Bytes.toBytes("ddd"),
@@ -188,6 +189,11 @@ public class TestLoadIncrementalHFiles {
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
HTableDescriptor htd = new HTableDescriptor(TABLE);
+ // set real family name to upper case in purpose to simulate the case that
+ // family name in HFiles is invalid
+ HColumnDescriptor family =
+ new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase()));
+ htd.addFamily(family);
admin.createTable(htd, SPLIT_KEYS);
HTable table = new HTable(util.getConfiguration(), TABLE);
@@ -198,6 +204,11 @@ public class TestLoadIncrementalHFiles {
assertTrue("Loading into table with non-existent family should have failed", false);
} catch (Exception e) {
assertTrue("IOException expected", e instanceof IOException);
+ // further check whether the exception message is correct
+ String errMsg = e.getMessage();
+ assertTrue("Incorrect exception message, expected message: ["
+ + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
+ errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
}
table.close();
admin.close();