You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/02/13 20:16:16 UTC
[4/4] hbase git commit: HBASE-13037 LoadIncrementalHFile should try
to verify the content of unmatched families
HBASE-13037 LoadIncrementalHFile should try to verify the content of unmatched families
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9b8f4805
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9b8f4805
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9b8f4805
Branch: refs/heads/branch-1.0
Commit: 9b8f4805d7d7cc682ac6f231c5f36bf49b20b67e
Parents: b1d4442
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Feb 13 18:15:24 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Feb 13 20:15:30 2015 +0100
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 41 ++++++++++++++
.../hbase/mapreduce/LoadIncrementalHFiles.java | 16 ++++--
.../mapreduce/TestLoadIncrementalHFiles.java | 58 +++++++++++++++++++-
3 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9b8f4805/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index f938020..cf09b0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -532,6 +532,47 @@ public class HFile {
}
/**
+ * Returns true if the specified file has a valid HFile Trailer.
+ * @param fs filesystem
+ * @param path Path to file to verify
+ * @return true if the file has a valid HFile Trailer, otherwise false
+ * @throws IOException if failed to read from the underlying stream
+ */
+ public static boolean isHFileFormat(final FileSystem fs, final Path path) throws IOException {
+ return isHFileFormat(fs, fs.getFileStatus(path));
+ }
+
+ /**
+ * Returns true if the specified file has a valid HFile Trailer.
+ * @param fs filesystem
+ * @param fileStatus the file to verify
+ * @return true if the file has a valid HFile Trailer, otherwise false
+ * @throws IOException if failed to read from the underlying stream
+ */
+ public static boolean isHFileFormat(final FileSystem fs, final FileStatus fileStatus)
+ throws IOException {
+ final Path path = fileStatus.getPath();
+ final long size = fileStatus.getLen();
+ FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
+ try {
+ boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
+ assert !isHBaseChecksum; // Initially we must read with FS checksum.
+ FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ try {
+ fsdis.close();
+ } catch (Throwable t) {
+ LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t);
+ }
+ }
+ }
+
+ /**
* Metadata for this file. Conjured by the writer. Read in by the reader.
*/
public static class FileInfo implements SortedMap<byte[], byte[]> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9b8f4805/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 18a295b..9beb6d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -270,15 +271,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
discoverLoadQueue(queue, hfofDir);
// check whether there is invalid family name in HFiles to be bulkloaded
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
- ArrayList<String> familyNames = new ArrayList<String>();
+ ArrayList<String> familyNames = new ArrayList<String>(families.size());
for (HColumnDescriptor family : families) {
familyNames.add(family.getNameAsString());
}
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
- for (LoadQueueItem lqi : queue) {
+ Iterator<LoadQueueItem> queueIter = queue.iterator();
+ while (queueIter.hasNext()) {
+ LoadQueueItem lqi = queueIter.next();
String familyNameInHFile = Bytes.toString(lqi.family);
if (!familyNames.contains(familyNameInHFile)) {
- unmatchedFamilies.add(familyNameInHFile);
+ if (HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath)) {
+ unmatchedFamilies.add(familyNameInHFile);
+ } else {
+ LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping");
+ queueIter.remove();
+ }
}
}
if (unmatchedFamilies.size() > 0) {
@@ -717,7 +725,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw e;
}
}
-
+
private boolean isSecureBulkLoadEndpointAvailable() {
String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
http://git-wip-us.apache.org/repos/asf/hbase/blob/9b8f4805/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 49be5c8..abec268 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -270,7 +271,7 @@ public class TestLoadIncrementalHFiles {
file.getPath().getName() != "DONOTERASE");
}
}
-
+
util.deleteTable(tableName);
}
@@ -306,6 +307,61 @@ public class TestLoadIncrementalHFiles {
}
}
+ /**
+ * Write a random data file in a dir with a valid family name but not part of the table families
+ * we should we able to bulkload without getting the unmatched family exception. HBASE-13037
+ */
+ @Test(timeout = 60000)
+ public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
+ Path dir = util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName");
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
+ FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
+
+ final String NON_FAMILY_FOLDER = "_logs";
+ Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
+ fs.mkdirs(nonFamilyDir);
+ createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
+
+ Table table = null;
+ try {
+ final String TABLE_NAME = "mytable_testNonHfileFolderWithUnmatchedFamilyName";
+ table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY);
+
+ final String[] args = {dir.toString(), TABLE_NAME};
+ new LoadIncrementalHFiles(util.getConfiguration()).run(args);
+ assertEquals(500, util.countRows(table));
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ fs.delete(dir, true);
+ }
+ }
+
+ private static void createRandomDataFile(FileSystem fs, Path path, int size)
+ throws IOException {
+ FSDataOutputStream stream = fs.create(path);
+ try {
+ byte[] data = new byte[1024];
+ for (int i = 0; i < data.length; ++i) {
+ data[i] = (byte)(i & 0xff);
+ }
+ while (size >= data.length) {
+ stream.write(data, 0, data.length);
+ size -= data.length;
+ }
+ if (size > 0) {
+ stream.write(data, 0, size);
+ }
+ } finally {
+ stream.close();
+ }
+ }
+
@Test
public void testSplitStoreFile() throws IOException {
Path dir = util.getDataTestDirOnTestFS("testSplitHFile");