You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/09/30 13:48:55 UTC
hbase git commit: HBASE-18910
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 53f1e2480 -> ca78cd500
HBASE-18910
Backport HBASE-17292 "Add observer notification before bulk loaded hfile is moved to region directory" to 1.3
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca78cd50
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca78cd50
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca78cd50
Branch: refs/heads/branch-1.3
Commit: ca78cd500563e06149425f5931b67a8b6c12a849
Parents: 53f1e24
Author: Guangxu Cheng <gu...@gmail.com>
Authored: Sat Sep 30 11:59:33 2017 +0800
Committer: tedyu <yu...@gmail.com>
Committed: Sat Sep 30 06:48:44 2017 -0700
----------------------------------------------------------------------
.../hbase/coprocessor/BaseRegionObserver.java | 10 +++
.../hbase/coprocessor/RegionObserver.java | 23 ++++++
.../hadoop/hbase/regionserver/HRegion.java | 83 ++++++++++++++------
.../hbase/regionserver/HRegionFileSystem.java | 24 ++++--
.../hadoop/hbase/regionserver/HStore.java | 16 +++-
.../hbase/regionserver/RSRpcServices.java | 13 +--
.../regionserver/RegionCoprocessorHost.java | 21 +++++
.../apache/hadoop/hbase/regionserver/Store.java | 5 +-
8 files changed, 159 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 1bf7449..1c31169 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -483,6 +483,16 @@ public class BaseRegionObserver implements RegionObserver {
}
@Override
+ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+ }
+
+ @Override
+ public void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ final byte[] family, Path srcPath, Path dstPath) throws IOException {
+ }
+
+ @Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 8c5c15a..0bea614 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1187,6 +1187,29 @@ public interface RegionObserver extends Coprocessor {
List<Pair<byte[], String>> familyPaths) throws IOException;
/**
+ * Called before moving bulk loaded hfile to region directory.
+ *
+ * @param ctx
+ * @param family column family
+ * @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir }
+ * Each pair are for the same hfile.
+ * @throws IOException
+ */
+ void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException;
+
+ /**
+ * Called after moving bulk loaded hfile to region directory.
+ *
+ * @param ctx
+ * @param family column family
+ * @param srcPath Path to file before the move
+ * @param dstPath Path to file after the move
+ */
+ void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ final byte[] family, Path srcPath, Path dstPath) throws IOException;
+
+ /**
* Called after bulkLoadHFile.
*
* @param ctx
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 906ea58..f1f20ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5592,37 +5592,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst();
String path = p.getSecond();
Store store = getStore(familyName);
+ if (!familyWithFinalPath.containsKey(familyName)) {
+ familyWithFinalPath.put(familyName, new ArrayList<Pair<Path, Path>>());
+ }
+ List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
try {
String finalPath = path;
if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
- Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
-
- // Note the size of the store file
- try {
- FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
- storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
- .getLen());
- } catch (IOException e) {
- LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
- storeFilesSizes.put(commitedStoreFile.getName(), 0L);
- }
-
- if(storeFiles.containsKey(familyName)) {
- storeFiles.get(familyName).add(commitedStoreFile);
- } else {
- List<Path> storeFileNames = new ArrayList<Path>();
- storeFileNames.add(commitedStoreFile);
- storeFiles.put(familyName, storeFileNames);
- }
- if (bulkLoadListener != null) {
- bulkLoadListener.doneBulkLoad(familyName, path);
- }
+ Pair<Path, Path> pair = ((HStore)store).preBulkLoadHFile(finalPath, seqId);
+ lst.add(pair);
} catch (IOException ioe) {
// A failure here can cause an atomicity violation that we currently
// cannot recover from since it is likely a failed HDFS operation.
@@ -5642,6 +5628,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ if (this.getCoprocessorHost() != null) {
+ for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
+ this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue());
+ }
+ }
+ for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
+ byte[] familyName = entry.getKey();
+ for (Pair<Path, Path> p : entry.getValue()) {
+ String path = p.getFirst().toString();
+ Path commitedStoreFile = p.getSecond();
+ Store store = getStore(familyName);
+ try {
+ store.bulkLoadHFile(familyName, path, commitedStoreFile);
+ // Note the size of the store file
+ try {
+ FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
+ storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
+ .getLen());
+ } catch (IOException e) {
+ LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
+ storeFilesSizes.put(commitedStoreFile.getName(), 0L);
+ }
+
+ if(storeFiles.containsKey(familyName)) {
+ storeFiles.get(familyName).add(commitedStoreFile);
+ } else {
+ List<Path> storeFileNames = new ArrayList<Path>();
+ storeFileNames.add(commitedStoreFile);
+ storeFiles.put(familyName, storeFileNames);
+ }
+ if (bulkLoadListener != null) {
+ bulkLoadListener.doneBulkLoad(familyName, path);
+ }
+ } catch (IOException ioe) {
+ // A failure here can cause an atomicity violation that we currently
+ // cannot recover from since it is likely a failed HDFS operation.
+
+ // TODO Need a better story for reverting partial failures due to HDFS.
+ LOG.error("There was a partial failure due to IO when attempting to" +
+ " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe);
+ if (bulkLoadListener != null) {
+ try {
+ bulkLoadListener.failedBulkLoad(familyName, path);
+ } catch (Exception ex) {
+ LOG.error("Error while calling failedBulkLoad for family " +
+ Bytes.toString(familyName) + " with path " + path, ex);
+ }
+ }
+ throw ioe;
+ }
+ }
+ }
+
isSuccessful = true;
} finally {
if (wal != null && !storeFiles.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 619358c..e9face1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
/**
@@ -361,11 +362,13 @@ public class HRegionFileSystem {
* @throws IOException
*/
public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
- return commitStoreFile(familyName, buildPath, -1, false);
+ Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
+ return commitStoreFile(buildPath, dstPath);
}
/**
- * Move the file from a build/temp location to the main family store directory.
+ * Generate the filename in the main family store directory for moving the file from a build/temp
+ * location.
* @param familyName Family that will gain the file
* @param buildPath {@link Path} to the file to commit.
* @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
@@ -373,7 +376,7 @@ public class HRegionFileSystem {
* @return The new {@link Path} of the committed file
* @throws IOException
*/
- private Path commitStoreFile(final String familyName, final Path buildPath,
+ private Path preCommitStoreFile(final String familyName, final Path buildPath,
final long seqNum, final boolean generateNewName) throws IOException {
Path storeDir = getStoreDir(familyName);
if(!fs.exists(storeDir) && !createDir(storeDir))
@@ -388,6 +391,17 @@ public class HRegionFileSystem {
throw new FileNotFoundException(buildPath.toString());
}
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
+ return dstPath;
+ }
+
+ /*
+ * Moves file from staging dir to region dir
+ * @param buildPath {@link Path} to the file to commit.
+ * @param dstPath {@link Path} to the file under region dir
+ * @return The {@link Path} of the committed file
+ * @throws IOException
+ */
+ Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
// buildPath exists, therefore not doing an exists() check.
if (!rename(buildPath, dstPath)) {
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
@@ -445,7 +459,7 @@ public class HRegionFileSystem {
* @return The destination {@link Path} of the bulk loaded file
* @throws IOException
*/
- Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
+ Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
throws IOException {
// Copy the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf);
@@ -463,7 +477,7 @@ public class HRegionFileSystem {
srcPath = tmpPath;
}
- return commitStoreFile(familyName, srcPath, seqNum, true);
+ return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
}
// ===========================================================================
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 957751c..409e309 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -796,10 +797,21 @@ public class HStore implements Store {
}
}
+ public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+ Path srcPath = new Path(srcPathStr);
+ return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+ }
+
@Override
- public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+ public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
Path srcPath = new Path(srcPathStr);
- Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+ try {
+ fs.commitStoreFile(srcPath, dstPath);
+ } finally {
+ if (this.getCoprocessorHost() != null) {
+ this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
+ }
+ }
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
+ dstPath + " - updating store file list.");
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cda48fc..1be168a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2074,11 +2074,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
boolean loaded = false;
- if (!bypass) {
- loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
- }
- if (region.getCoprocessorHost() != null) {
- loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+ try {
+ if (!bypass) {
+ loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+ }
+ } finally {
+ if (region.getCoprocessorHost() != null) {
+ loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+ }
}
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(loaded);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 1ed866a..d28bd8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1505,6 +1505,27 @@ public class RegionCoprocessorHost
postWALRestore(info, (WALKey)logKey, logEdit);
}
+ public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
+ throws IOException {
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preCommitStoreFile(ctx, family, pairs);
+ }
+ });
+ }
+ public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
+ }
+ });
+ }
+
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @return true if the default operation should be bypassed
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index e7a4de5..77fef1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -316,10 +316,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* This method should only be called from Region. It is assumed that the ranges of values in the
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
*
+ * @param family the column family
* @param srcPathStr
- * @param sequenceId sequence Id associated with the HFile
+ * @param dstPath
*/
- Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
+ Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException;
// General accessors into the state of the store
// TODO abstract some of this out into a metrics class