You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/09/30 13:48:55 UTC

hbase git commit: HBASE-18910

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 53f1e2480 -> ca78cd500


HBASE-18910

Backport HBASE-17292 "Add observer notification before bulk loaded hfile is moved to region directory" to 1.3

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca78cd50
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca78cd50
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca78cd50

Branch: refs/heads/branch-1.3
Commit: ca78cd500563e06149425f5931b67a8b6c12a849
Parents: 53f1e24
Author: Guangxu Cheng <gu...@gmail.com>
Authored: Sat Sep 30 11:59:33 2017 +0800
Committer: tedyu <yu...@gmail.com>
Committed: Sat Sep 30 06:48:44 2017 -0700

----------------------------------------------------------------------
 .../hbase/coprocessor/BaseRegionObserver.java   | 10 +++
 .../hbase/coprocessor/RegionObserver.java       | 23 ++++++
 .../hadoop/hbase/regionserver/HRegion.java      | 83 ++++++++++++++------
 .../hbase/regionserver/HRegionFileSystem.java   | 24 ++++--
 .../hadoop/hbase/regionserver/HStore.java       | 16 +++-
 .../hbase/regionserver/RSRpcServices.java       | 13 +--
 .../regionserver/RegionCoprocessorHost.java     | 21 +++++
 .../apache/hadoop/hbase/regionserver/Store.java |  5 +-
 8 files changed, 159 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 1bf7449..1c31169 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -483,6 +483,16 @@ public class BaseRegionObserver implements RegionObserver {
   }
 
   @Override
+  public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+  }
+
+  @Override
+  public void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+      final byte[] family, Path srcPath, Path dstPath) throws IOException {
+  }
+
+  @Override
   public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
     List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
     return hasLoaded;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 8c5c15a..0bea614 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1187,6 +1187,29 @@ public interface RegionObserver extends Coprocessor {
     List<Pair<byte[], String>> familyPaths) throws IOException;
 
   /**
+   * Called before moving bulk loaded hfile to region directory.
+   *
+   * @param ctx
+   * @param family column family
+   * @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir }
+   * Each pair are for the same hfile.
+   * @throws IOException
+   */
+  void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException;
+
+  /**
+   * Called after moving bulk loaded hfile to region directory.
+   *
+   * @param ctx
+   * @param family column family
+   * @param srcPath Path to file before the move
+   * @param dstPath Path to file after the move
+   */
+  void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+      final byte[] family, Path srcPath, Path dstPath) throws IOException;
+
+  /**
    * Called after bulkLoadHFile.
    *
    * @param ctx

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 906ea58..f1f20ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5592,37 +5592,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
 
+      Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath =
+          new TreeMap<>(Bytes.BYTES_COMPARATOR);
       for (Pair<byte[], String> p : familyPaths) {
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
         Store store = getStore(familyName);
+        if (!familyWithFinalPath.containsKey(familyName)) {
+          familyWithFinalPath.put(familyName, new ArrayList<Pair<Path, Path>>());
+        }
+        List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
         try {
           String finalPath = path;
           if (bulkLoadListener != null) {
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
           }
-          Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
-
-          // Note the size of the store file
-          try {
-            FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
-            storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
-                .getLen());
-          } catch (IOException e) {
-            LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
-            storeFilesSizes.put(commitedStoreFile.getName(), 0L);
-          }
-
-          if(storeFiles.containsKey(familyName)) {
-            storeFiles.get(familyName).add(commitedStoreFile);
-          } else {
-            List<Path> storeFileNames = new ArrayList<Path>();
-            storeFileNames.add(commitedStoreFile);
-            storeFiles.put(familyName, storeFileNames);
-          }
-          if (bulkLoadListener != null) {
-            bulkLoadListener.doneBulkLoad(familyName, path);
-          }
+          Pair<Path, Path> pair = ((HStore)store).preBulkLoadHFile(finalPath, seqId);
+          lst.add(pair);
         } catch (IOException ioe) {
           // A failure here can cause an atomicity violation that we currently
           // cannot recover from since it is likely a failed HDFS operation.
@@ -5642,6 +5628,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
 
+      if (this.getCoprocessorHost() != null) {
+        for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
+          this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue());
+        }
+      }
+      for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
+        byte[] familyName = entry.getKey();
+        for (Pair<Path, Path> p : entry.getValue()) {
+          String path = p.getFirst().toString();
+          Path commitedStoreFile = p.getSecond();
+          Store store = getStore(familyName);
+          try {
+            store.bulkLoadHFile(familyName, path, commitedStoreFile);
+            // Note the size of the store file
+            try {
+              FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
+              storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
+                  .getLen());
+            } catch (IOException e) {
+              LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
+              storeFilesSizes.put(commitedStoreFile.getName(), 0L);
+            }
+
+            if(storeFiles.containsKey(familyName)) {
+              storeFiles.get(familyName).add(commitedStoreFile);
+            } else {
+              List<Path> storeFileNames = new ArrayList<Path>();
+              storeFileNames.add(commitedStoreFile);
+              storeFiles.put(familyName, storeFileNames);
+            }
+            if (bulkLoadListener != null) {
+              bulkLoadListener.doneBulkLoad(familyName, path);
+            }
+          } catch (IOException ioe) {
+            // A failure here can cause an atomicity violation that we currently
+            // cannot recover from since it is likely a failed HDFS operation.
+
+            // TODO Need a better story for reverting partial failures due to HDFS.
+            LOG.error("There was a partial failure due to IO when attempting to" +
+                " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe);
+            if (bulkLoadListener != null) {
+              try {
+                bulkLoadListener.failedBulkLoad(familyName, path);
+              } catch (Exception ex) {
+                LOG.error("Error while calling failedBulkLoad for family " +
+                    Bytes.toString(familyName) + " with path " + path, ex);
+              }
+            }
+            throw ioe;
+          }
+        }
+      }
+
       isSuccessful = true;
     } finally {
       if (wal != null && !storeFiles.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 619358c..e9face1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 
 /**
@@ -361,11 +362,13 @@ public class HRegionFileSystem {
    * @throws IOException
    */
   public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
-    return commitStoreFile(familyName, buildPath, -1, false);
+    Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
+    return commitStoreFile(buildPath, dstPath);
   }
 
   /**
-   * Move the file from a build/temp location to the main family store directory.
+   * Generate the filename in the main family store directory for moving the file from a build/temp
+   * location.
    * @param familyName Family that will gain the file
    * @param buildPath {@link Path} to the file to commit.
    * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
@@ -373,7 +376,7 @@ public class HRegionFileSystem {
    * @return The new {@link Path} of the committed file
    * @throws IOException
    */
-  private Path commitStoreFile(final String familyName, final Path buildPath,
+  private Path preCommitStoreFile(final String familyName, final Path buildPath,
       final long seqNum, final boolean generateNewName) throws IOException {
     Path storeDir = getStoreDir(familyName);
     if(!fs.exists(storeDir) && !createDir(storeDir))
@@ -388,6 +391,17 @@ public class HRegionFileSystem {
       throw new FileNotFoundException(buildPath.toString());
     }
     LOG.debug("Committing store file " + buildPath + " as " + dstPath);
+    return dstPath;
+  }
+
+  /*
+   * Moves file from staging dir to region dir
+   * @param buildPath {@link Path} to the file to commit.
+   * @param dstPath {@link Path} to the file under region dir
+   * @return The {@link Path} of the committed file
+   * @throws IOException
+   */
+  Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
     // buildPath exists, therefore not doing an exists() check.
     if (!rename(buildPath, dstPath)) {
       throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
@@ -445,7 +459,7 @@ public class HRegionFileSystem {
    * @return The destination {@link Path} of the bulk loaded file
    * @throws IOException
    */
-  Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
+  Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
       throws IOException {
     // Copy the file if it's on another filesystem
     FileSystem srcFs = srcPath.getFileSystem(conf);
@@ -463,7 +477,7 @@ public class HRegionFileSystem {
       srcPath = tmpPath;
     }
 
-    return commitStoreFile(familyName, srcPath, seqNum, true);
+    return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
   }
 
   // ===========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 957751c..409e309 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -796,10 +797,21 @@ public class HStore implements Store {
     }
   }
 
+  public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+    Path srcPath = new Path(srcPathStr);
+    return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+  }
+
   @Override
-  public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
     Path srcPath = new Path(srcPathStr);
-    Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+    try {
+      fs.commitStoreFile(srcPath, dstPath);
+    } finally {
+      if (this.getCoprocessorHost() != null) {
+        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
+      }
+    }
 
     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
         + dstPath + " - updating store file list.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cda48fc..1be168a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2074,11 +2074,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
       }
       boolean loaded = false;
-      if (!bypass) {
-        loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
-      }
-      if (region.getCoprocessorHost() != null) {
-        loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+      try {
+        if (!bypass) {
+          loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+        }
+      } finally {
+        if (region.getCoprocessorHost() != null) {
+          loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+        }
       }
       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(loaded);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 1ed866a..d28bd8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1505,6 +1505,27 @@ public class RegionCoprocessorHost
     postWALRestore(info, (WALKey)logKey, logEdit);
   }
 
+  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
+      throws IOException {
+    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+      @Override
+      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+          throws IOException {
+        oserver.preCommitStoreFile(ctx, family, pairs);
+      }
+    });
+  }
+  public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath)
+      throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+      @Override
+      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+          throws IOException {
+        oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
+      }
+    });
+  }
+
   /**
    * @param familyPaths pairs of { CF, file path } submitted for bulk load
    * @return true if the default operation should be bypassed

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index e7a4de5..77fef1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -316,10 +316,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
    * This method should only be called from Region. It is assumed that the ranges of values in the
    * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
    *
+   * @param family the column family
    * @param srcPathStr
-   * @param sequenceId sequence Id associated with the HFile
+   * @param dstPath
    */
-  Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
+  Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException;
 
   // General accessors into the state of the store
   // TODO abstract some of this out into a metrics class