You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/06/23 02:54:12 UTC

svn commit: r1353057 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/

Author: apurtell
Date: Sat Jun 23 00:54:11 2012
New Revision: 1353057

URL: http://svn.apache.org/viewvc?rev=1353057&view=rev
Log:
HBASE-6224. Add pre and post coprocessor hooks for bulk load (Francis Liu)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Sat Jun 23 00:54:11 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Pair;
 
 import java.io.IOException;
 
@@ -279,4 +280,15 @@ public abstract class BaseRegionObserver
   public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
       HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
   }
+
+  @Override
+  public void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths) throws IOException {
+  }
+
+  @Override
+  public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
+    return hasLoaded;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Sat Jun 23 00:54:11 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Coprocessors implement this interface to observe and mediate client actions
@@ -655,4 +656,28 @@ public interface RegionObserver extends 
    */
   void postWALRestore(final ObserverContext<RegionCoprocessorEnvironment> ctx,
       HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+  /**
+   * Called before bulkLoadHFile. Users can create a StoreFile instance to
+   * access the contents of a HFile.
+   *
+   * @param ctx
+   * @param familyPaths pairs of { CF, HFile path } submitted for bulk load. Adding
+   * or removing from this list will add or remove HFiles to be bulk loaded.
+   * @throws IOException
+   */
+  void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths) throws IOException;
+
+  /**
+   * Called after bulkLoadHFile.
+   *
+   * @param ctx
+   * @param familyPaths pairs of { CF, HFile path } submitted for bulk load
+   * @param hasLoaded whether the bulkLoad was successful
+   * @return the new value of hasLoaded
+   * @throws IOException
+   */
+  boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Jun 23 00:54:11 2012
@@ -3122,10 +3122,20 @@ public class  HRegionServer implements C
       HRegion region = getRegion(request.getRegion());
       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
       for (FamilyPath familyPath: request.getFamilyPathList()) {
-        familyPaths.add(new Pair<byte[], String>(
-          familyPath.getFamily().toByteArray(), familyPath.getPath()));
+        familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
+          familyPath.getPath()));
+      }
+      boolean bypass = false;
+      if (region.getCoprocessorHost() != null) {
+        bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+      }
+      boolean loaded = false;
+      if (!bypass) {
+        loaded = region.bulkLoadHFiles(familyPaths);
+      }
+      if (region.getCoprocessorHost() != null) {
+        loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
       }
-      boolean loaded = region.bulkLoadHFiles(familyPaths);
       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(loaded);
       return builder.build();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sat Jun 23 00:54:11 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ipc.Copro
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
@@ -1267,4 +1268,58 @@ public class RegionCoprocessorHost
       }
     }
   }
+
+  /**
+   * @param familyPaths pairs of { CF, file path } submitted for bulk load
+   * @return true if the default operation should be bypassed
+   * @throws IOException
+   */
+  public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws IOException {
+    boolean bypass = false;
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env: coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        bypass |= ctx.shouldBypass();
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+
+    return bypass;
+  }
+
+  /**
+   * @param familyPaths pairs of { CF, file path } submitted for bulk load
+   * @param hasLoaded whether load was successful or not
+   * @return the possibly modified value of hasLoaded
+   * @throws IOException
+   */
+  public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean hasLoaded)
+      throws IOException {
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env: coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
+            familyPaths, hasLoaded);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+
+    return hasLoaded;
+  }
+
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sat Jun 23 00:54:11 2012
@@ -88,7 +88,7 @@ import com.google.common.collect.Orderin
  * The reason for this weird pattern where you use a different instance for the
  * writer and a reader is that we write once but read a lot more.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("Coprocessor")
 public class StoreFile extends SchemaConfigured {
   static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
@@ -233,7 +233,7 @@ public class StoreFile extends SchemaCon
    * @param dataBlockEncoder data block encoding algorithm.
    * @throws IOException When opening the reader fails.
    */
-  StoreFile(final FileSystem fs,
+  public StoreFile(final FileSystem fs,
             final Path p,
             final Configuration conf,
             final CacheConfig cacheConf,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sat Jun 23 00:54:11 2012
@@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionser
  * KeyValueScanner adaptor over the Reader.  It also provides hooks into
  * bloom filter things.
  */
-@InterfaceAudience.Private
-class StoreFileScanner implements KeyValueScanner {
+@InterfaceAudience.LimitedPrivate("Coprocessor")
+public class StoreFileScanner implements KeyValueScanner {
   static final Log LOG = LogFactory.getLog(Store.class);
 
   // the reader it comes from:

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Sat Jun 23 00:54:11 2012
@@ -20,6 +20,8 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A sample region observer that tests the RegionObserver interface.
@@ -85,6 +88,8 @@ public class SimpleRegionObserver extend
   boolean hadPostScannerClose = false;
   boolean hadPreScannerOpen = false;
   boolean hadPostScannerOpen = false;
+  boolean hadPreBulkLoadHFile = false;
+  boolean hadPostBulkLoadHFile = false;
 
   @Override
   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -384,6 +389,43 @@ public class SimpleRegionObserver extend
     return result;
   }
 
+  @Override
+  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                               List<Pair<byte[], String>> familyPaths) throws IOException {
+    RegionCoprocessorEnvironment e = ctx.getEnvironment();
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    if (Arrays.equals(e.getRegion().getTableDesc().getName(),
+        TestRegionObserverInterface.TEST_TABLE)) {
+      assertNotNull(familyPaths);
+      assertEquals(1,familyPaths.size());
+      assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
+      String familyPath = familyPaths.get(0).getSecond();
+      String familyName = Bytes.toString(TestRegionObserverInterface.A);
+      assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
+    }
+    hadPreBulkLoadHFile = true;
+  }
+
+  @Override
+  public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                                   List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
+    RegionCoprocessorEnvironment e = ctx.getEnvironment();
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    if (Arrays.equals(e.getRegion().getTableDesc().getName(),
+        TestRegionObserverInterface.TEST_TABLE)) {
+      assertNotNull(familyPaths);
+      assertEquals(1,familyPaths.size());
+      assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
+      String familyPath = familyPaths.get(0).getSecond();
+      String familyName = Bytes.toString(TestRegionObserverInterface.A);
+      assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
+    }
+    hadPostBulkLoadHFile = true;
+    return hasLoaded;
+  }
+
   public boolean hadPreGet() {
     return hadPreGet;
   }
@@ -430,4 +472,12 @@ public class SimpleRegionObserver extend
   public boolean hadDeleted() {
     return hadPreDeleted && hadPostDeleted;
   }
+
+  public boolean hadPostBulkLoadHFile() {
+    return hadPostBulkLoadHFile;
+  }
+
+  public boolean hadPreBulkLoadHFile() {
+    return hadPreBulkLoadHFile;
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Sat Jun 23 00:54:11 2012
@@ -34,6 +34,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -52,6 +54,9 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -447,6 +452,37 @@ public class TestRegionObserverInterface
     table.close();
   }
 
+  @Test
+  public void bulkLoadHFileTest() throws Exception {
+    String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
+    byte[] tableName = TEST_TABLE;
+    Configuration conf = util.getConfiguration();
+    HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+        tableName,
+        new Boolean[] {false, false}
+    );
+
+    FileSystem fs = util.getTestFileSystem();
+    final Path dir = util.getDataTestDir(testName).makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(A));
+
+    createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
+
+    //Bulk load
+    new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+        tableName,
+        new Boolean[] {true, true}
+    );
+    util.deleteTable(tableName);
+    table.close();
+  }
+
   // check each region whether the coprocessor upcalls are called or not.
   private void verifyMethodResult(Class c, String methodName[], byte[] tableName,
                                   Object value[]) throws IOException {
@@ -475,6 +511,25 @@ public class TestRegionObserverInterface
     }
   }
 
+  private static void createHFile(
+      Configuration conf,
+      FileSystem fs, Path path,
+      byte[] family, byte[] qualifier) throws IOException {
+    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
+        .withPath(fs, path)
+        .withComparator(KeyValue.KEY_COMPARATOR)
+        .create();
+    long now = System.currentTimeMillis();
+    try {
+      for (int i =1;i<=9;i++) {
+        KeyValue kv = new KeyValue(Bytes.toBytes(i+""), family, qualifier, now, Bytes.toBytes(i+""));
+        writer.append(kv);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
   private static byte [][] makeN(byte [] base, int n) {
     byte [][] ret = new byte[n][];
     for(int i=0;i<n;i++) {