You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/06/23 02:54:12 UTC
svn commit: r1353057 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/
Author: apurtell
Date: Sat Jun 23 00:54:11 2012
New Revision: 1353057
URL: http://svn.apache.org/viewvc?rev=1353057&view=rev
Log:
HBASE-6224. Add pre and post coprocessor hooks for bulk load (Francis Liu)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Sat Jun 23 00:54:11 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Pair;
import java.io.IOException;
@@ -279,4 +280,15 @@ public abstract class BaseRegionObserver
public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
}
+
+ @Override
+ public void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths) throws IOException {
+ }
+
+ @Override
+ public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
+ return hasLoaded;
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Sat Jun 23 00:54:11 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Coprocessors implement this interface to observe and mediate client actions
@@ -655,4 +656,28 @@ public interface RegionObserver extends
*/
void postWALRestore(final ObserverContext<RegionCoprocessorEnvironment> ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called before bulkLoadHFile. Users can create a StoreFile instance to
+ * access the contents of a HFile.
+ *
+ * @param ctx
+ * @param familyPaths pairs of { CF, HFile path } submitted for bulk load. Adding
+ * or removing from this list will add or remove HFiles to be bulk loaded.
+ * @throws IOException
+ */
+ void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths) throws IOException;
+
+ /**
+ * Called after bulkLoadHFile.
+ *
+ * @param ctx
+ * @param familyPaths pairs of { CF, HFile path } submitted for bulk load
+ * @param hasLoaded whether the bulkLoad was successful
+ * @return the new value of hasLoaded
+ * @throws IOException
+ */
+ boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Jun 23 00:54:11 2012
@@ -3122,10 +3122,20 @@ public class HRegionServer implements C
HRegion region = getRegion(request.getRegion());
List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
for (FamilyPath familyPath: request.getFamilyPathList()) {
- familyPaths.add(new Pair<byte[], String>(
- familyPath.getFamily().toByteArray(), familyPath.getPath()));
+ familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
+ familyPath.getPath()));
+ }
+ boolean bypass = false;
+ if (region.getCoprocessorHost() != null) {
+ bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+ }
+ boolean loaded = false;
+ if (!bypass) {
+ loaded = region.bulkLoadHFiles(familyPaths);
+ }
+ if (region.getCoprocessorHost() != null) {
+ loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
}
- boolean loaded = region.bulkLoadHFiles(familyPaths);
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(loaded);
return builder.build();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sat Jun 23 00:54:11 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ipc.Copro
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
@@ -1267,4 +1268,58 @@ public class RegionCoprocessorHost
}
}
}
+
+ /**
+ * @param familyPaths pairs of { CF, file path } submitted for bulk load
+ * @return true if the default operation should be bypassed
+ * @throws IOException
+ */
+ public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws IOException {
+ boolean bypass = false;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+
+ return bypass;
+ }
+
+ /**
+ * @param familyPaths pairs of { CF, file path } submitted for bulk load
+ * @param hasLoaded whether load was successful or not
+ * @return the possibly modified value of hasLoaded
+ * @throws IOException
+ */
+ public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean hasLoaded)
+ throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
+ familyPaths, hasLoaded);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+
+ return hasLoaded;
+ }
+
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sat Jun 23 00:54:11 2012
@@ -88,7 +88,7 @@ import com.google.common.collect.Orderin
* The reason for this weird pattern where you use a different instance for the
* writer and a reader is that we write once but read a lot more.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("Coprocessor")
public class StoreFile extends SchemaConfigured {
static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
@@ -233,7 +233,7 @@ public class StoreFile extends SchemaCon
* @param dataBlockEncoder data block encoding algorithm.
* @throws IOException When opening the reader fails.
*/
- StoreFile(final FileSystem fs,
+ public StoreFile(final FileSystem fs,
final Path p,
final Configuration conf,
final CacheConfig cacheConf,
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sat Jun 23 00:54:11 2012
@@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionser
* KeyValueScanner adaptor over the Reader. It also provides hooks into
* bloom filter things.
*/
-@InterfaceAudience.Private
-class StoreFileScanner implements KeyValueScanner {
+@InterfaceAudience.LimitedPrivate("Coprocessor")
+public class StoreFileScanner implements KeyValueScanner {
static final Log LOG = LogFactory.getLog(Store.class);
// the reader it comes from:
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Sat Jun 23 00:54:11 2012
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.coprocessor;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
/**
* A sample region observer that tests the RegionObserver interface.
@@ -85,6 +88,8 @@ public class SimpleRegionObserver extend
boolean hadPostScannerClose = false;
boolean hadPreScannerOpen = false;
boolean hadPostScannerOpen = false;
+ boolean hadPreBulkLoadHFile = false;
+ boolean hadPostBulkLoadHFile = false;
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -384,6 +389,43 @@ public class SimpleRegionObserver extend
return result;
}
+ @Override
+ public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths) throws IOException {
+ RegionCoprocessorEnvironment e = ctx.getEnvironment();
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ if (Arrays.equals(e.getRegion().getTableDesc().getName(),
+ TestRegionObserverInterface.TEST_TABLE)) {
+ assertNotNull(familyPaths);
+ assertEquals(1,familyPaths.size());
+ assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
+ String familyPath = familyPaths.get(0).getSecond();
+ String familyName = Bytes.toString(TestRegionObserverInterface.A);
+ assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
+ }
+ hadPreBulkLoadHFile = true;
+ }
+
+ @Override
+ public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
+ RegionCoprocessorEnvironment e = ctx.getEnvironment();
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ if (Arrays.equals(e.getRegion().getTableDesc().getName(),
+ TestRegionObserverInterface.TEST_TABLE)) {
+ assertNotNull(familyPaths);
+ assertEquals(1,familyPaths.size());
+ assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
+ String familyPath = familyPaths.get(0).getSecond();
+ String familyName = Bytes.toString(TestRegionObserverInterface.A);
+ assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
+ }
+ hadPostBulkLoadHFile = true;
+ return hasLoaded;
+ }
+
public boolean hadPreGet() {
return hadPreGet;
}
@@ -430,4 +472,12 @@ public class SimpleRegionObserver extend
public boolean hadDeleted() {
return hadPreDeleted && hadPostDeleted;
}
+
+ public boolean hadPostBulkLoadHFile() {
+ return hadPostBulkLoadHFile;
+ }
+
+ public boolean hadPreBulkLoadHFile() {
+ return hadPreBulkLoadHFile;
+ }
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1353057&r1=1353056&r2=1353057&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Sat Jun 23 00:54:11 2012
@@ -34,6 +34,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -52,6 +54,9 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -447,6 +452,37 @@ public class TestRegionObserverInterface
table.close();
}
+ @Test
+ public void bulkLoadHFileTest() throws Exception {
+ String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
+ byte[] tableName = TEST_TABLE;
+ Configuration conf = util.getConfiguration();
+ HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+ tableName,
+ new Boolean[] {false, false}
+ );
+
+ FileSystem fs = util.getTestFileSystem();
+ final Path dir = util.getDataTestDir(testName).makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(A));
+
+ createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
+
+ //Bulk load
+ new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+ tableName,
+ new Boolean[] {true, true}
+ );
+ util.deleteTable(tableName);
+ table.close();
+ }
+
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class c, String methodName[], byte[] tableName,
Object value[]) throws IOException {
@@ -475,6 +511,25 @@ public class TestRegionObserverInterface
}
}
+ private static void createHFile(
+ Configuration conf,
+ FileSystem fs, Path path,
+ byte[] family, byte[] qualifier) throws IOException {
+ HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
+ .withPath(fs, path)
+ .withComparator(KeyValue.KEY_COMPARATOR)
+ .create();
+ long now = System.currentTimeMillis();
+ try {
+ for (int i =1;i<=9;i++) {
+ KeyValue kv = new KeyValue(Bytes.toBytes(i+""), family, qualifier, now, Bytes.toBytes(i+""));
+ writer.append(kv);
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
private static byte [][] makeN(byte [] base, int n) {
byte [][] ret = new byte[n][];
for(int i=0;i<n;i++) {