You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2011/02/08 00:01:29 UTC
svn commit: r1068206 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/coprocessor/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regionserver/wal/
src/test/java/org/apache/hadoop/hbase/coprocessor/...
Author: apurtell
Date: Mon Feb 7 23:01:28 2011
New Revision: 1068206
URL: http://svn.apache.org/viewvc?rev=1068206&view=rev
Log:
HBASE-3257 Coprocessors: Extend server side API to include HLog operations
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Feb 7 23:01:28 2011
@@ -49,8 +49,6 @@ Release 0.91.0 - Unreleased
IMPROVEMENTS
- HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
- Andrew Purtell)
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
HBASE-3292 Expose block cache hit/miss/evict counts into region server
metrics
@@ -60,9 +58,6 @@ Release 0.91.0 - Unreleased
HBASE-1861 Multi-Family support for bulk upload tools
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
HBASE-3328 Added Admin API to specify explicit split points
- HBASE-3345 Coprocessors: Allow observers to completely override base
- function
- HBASE-3260 Coprocessors: Add explicit lifecycle management
HBASE-3377 Upgrade Jetty to 6.1.26
HBASE-3387 Pair does not deep check arrays for equality
(Jesse Yates via Stack)
@@ -77,16 +72,24 @@ Release 0.91.0 - Unreleased
NEW FEATURES
+ HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
+ Andrew Purtell)
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
hfile close
HBASE-3335 Add BitComparator for filtering (Nathaniel Cook via Stack)
+ HBASE-3260 Coprocessors: Add explicit lifecycle management
HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster
+ HBASE-3345 Coprocessors: Allow observers to completely override base
+ function
HBASE-3448 RegionSplitter, utility class to manually split tables
HBASE-2824 A filter that randomly includes rows based on a configured
chance (Ferdy via Andrew Purtell)
HBASE-3455 Add memstore-local allocation buffers to combat heap
fragmentation in the region server. Enabled by default as of
0.91
+ HBASE-3257 Coprocessors: Extend server side API to include HLog operations
+ (Mingjie Lai via Andrew Purtell)
+
Release 0.90.1 - Unreleased
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java Mon Feb 7 23:01:28 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coproces
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
@@ -222,4 +225,14 @@ public abstract class BaseRegionObserver
public void postScannerClose(final RegionCoprocessorEnvironment e,
final InternalScanner s) throws IOException {
}
+
+ @Override
+ public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ }
+
+ @Override
+ public void postWALRestore(RegionCoprocessorEnvironment env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Mon Feb 7 23:01:28 2011
@@ -51,6 +51,9 @@ public abstract class CoprocessorHost<E
"hbase.coprocessor.region.classes";
public static final String MASTER_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.master.classes";
+ public static final String WAL_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.wal.classes";
+
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
/** Ordered set of loaded coprocessors with lock */
protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Mon Feb 7 23:01:28 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coproces
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -29,6 +30,8 @@ import org.apache.hadoop.hbase.client.In
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
@@ -529,4 +532,30 @@ public interface RegionObserver extends
public void postScannerClose(final RegionCoprocessorEnvironment e,
final InternalScanner s)
throws IOException;
+
+ /**
+ * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * replayed for this region.
+ *
+ * @param env
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @throws IOException
+ */
+ void preWALRestore(final RegionCoprocessorEnvironment env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * replayed for this region.
+ *
+ * @param env
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @throws IOException
+ */
+ void postWALRestore(final RegionCoprocessorEnvironment env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java Mon Feb 7 23:01:28 2011
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+public interface WALCoprocessorEnvironment extends CoprocessorEnvironment {
+ /** @return reference to the region server services */
+ public HLog getWAL();
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java Mon Feb 7 23:01:28 2011
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+import java.io.IOException;
+
+/**
+ * It's provided to have a way for coprocessors to observe, rewrite,
+ * or skip WALEdits as they are being written to the WAL.
+ *
+ * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} provides
+ * hooks for adding logic for WALEdits in the region context during reconstruction,
+ *
+ * Defines coprocessor hooks for interacting with operations on the
+ * {@link org.apache.hadoop.hbase.regionserver.wal.HLog}.
+ */
+public interface WALObserver extends Coprocessor {
+
+ /**
+ * Called before a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * is writen to WAL.
+ *
+ * @param env
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @return true if default behavior should be bypassed, false otherwise
+ * @throws IOException
+ */
+ boolean preWALWrite(CoprocessorEnvironment env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+ /**
+ * Called after a {@link org.apache.hadoop.hbase.regionserver.wal.WALEdit}
+ * is writen to WAL.
+ *
+ * @param env
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @throws IOException
+ */
+ void postWALWrite(CoprocessorEnvironment env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Feb 7 23:01:28 2011
@@ -2009,6 +2009,16 @@ public class HRegion implements HeapSize
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
+
+ // Start coprocessor replay here. The coprocessor is for each WALEdit
+ // instead of a KeyValue.
+ if (coprocessorHost != null) {
+ if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
+ // if bypass this log entry, ignore it ...
+ continue;
+ }
+ }
+
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
@@ -2046,6 +2056,10 @@ public class HRegion implements HeapSize
}
if (flush) internalFlushcache(null, currentEditSeqId);
+ if (coprocessorHost != null) {
+ coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
+ }
+
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % interval) == 0) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Mon Feb 7 23:01:28 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.co
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.util.StringUtils;
@@ -1004,4 +1007,56 @@ public class RegionCoprocessorHost
coprocessorLock.readLock().unlock();
}
}
+
+ /**
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @return true if default behavior should be bypassed, false otherwise
+ * @throws IOException
+ */
+ public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preWALRestore(env, info, logKey,
+ logEdit);
+ }
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @throws IOException
+ */
+ public void postWALRestore(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postWALRestore(env, info,
+ logKey, logEdit);
+ }
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Feb 7 23:01:28 2011
@@ -135,6 +135,8 @@ public class HLog implements Syncable {
private static Class<? extends Writer> logWriterClass;
private static Class<? extends Reader> logReaderClass;
+ private WALCoprocessorHost coprocessorHost;
+
static void resetLogReaderClass() {
HLog.logReaderClass = null;
}
@@ -400,6 +402,7 @@ public class HLog implements Syncable {
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
Threads.setDaemonThreadRunning(logSyncerThread,
Thread.currentThread().getName() + ".logSyncer");
+ coprocessorHost = new WALCoprocessorHost(this, conf);
}
public void registerWALActionsListener (final WALObserver listener) {
@@ -1074,8 +1077,13 @@ public class HLog implements Syncable {
}
try {
long now = System.currentTimeMillis();
- this.writer.append(new HLog.Entry(logKey, logEdit));
+ // coprocessor hook:
+ if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
+ // if not bypassed:
+ this.writer.append(new HLog.Entry(logKey, logEdit));
+ }
long took = System.currentTimeMillis() - now;
+ coprocessorHost.postWALWrite(info, logKey, logEdit);
writeTime += took;
writeOps++;
if (took > 1000) {
@@ -1445,6 +1453,13 @@ public class HLog implements Syncable {
}
/**
+ * @return Coprocessor host.
+ */
+ public WALCoprocessorHost getCoprocessorHost() {
+ return coprocessorHost;
+ }
+
+ /**
* Pass one or more log file names and it will either dump out a text version
* on <code>stdout</code> or split the specified log files.
*
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java Mon Feb 7 23:01:28 2011
@@ -0,0 +1,142 @@
+
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements the coprocessor environment and runtime support for coprocessors
+ * loaded within a {@link HLog}.
+ */
+public class WALCoprocessorHost
+ extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
+
+ private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class);
+
+ /**
+ * Encapsulation of the environment of each coprocessor
+ */
+ static class WALEnvironment extends CoprocessorHost.Environment
+ implements WALCoprocessorEnvironment {
+
+ private HLog wal;
+
+ @Override
+ public HLog getWAL() {
+ return wal;
+ }
+
+ /**
+ * Constructor
+ * @param impl the coprocessor instance
+ * @param priority chaining priority
+ */
+ public WALEnvironment(Class<?> implClass, final Coprocessor impl,
+ Coprocessor.Priority priority, final HLog hlog) {
+ super(impl, priority);
+ this.wal = hlog;
+ }
+ }
+
+ HLog wal;
+ /**
+ * Constructor
+ * @param region the region
+ * @param rsServices interface to available region server functionality
+ * @param conf the configuration
+ */
+ public WALCoprocessorHost(final HLog log, final Configuration conf) {
+ this.wal = log;
+ // load system default cp's from configuration.
+ loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
+ }
+
+ @Override
+ public WALEnvironment createEnvironment(Class<?> implClass,
+ Coprocessor instance, Priority priority) {
+ // TODO Auto-generated method stub
+ return new WALEnvironment(implClass, instance, priority, this.wal);
+ }
+
+ /**
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @return true if default behavior should be bypassed, false otherwise
+ * @throws IOException
+ */
+ public boolean preWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
+ throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (WALEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof
+ org.apache.hadoop.hbase.coprocessor.WALObserver) {
+ ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
+ preWALWrite(env, info, logKey, logEdit);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param info
+ * @param logKey
+ * @param logEdit
+ * @throws IOException
+ */
+ public void postWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (WALEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof
+ org.apache.hadoop.hbase.coprocessor.WALObserver) {
+ ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
+ postWALWrite(env, info, logKey, logEdit);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java Mon Feb 7 23:01:28 2011
@@ -0,0 +1,162 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Class for testing WAL coprocessor extension. WAL write monitor is defined
+ * in LogObserver while WAL Restore is in RegionObserver.
+ *
+ * It will monitor a WAL writing and Restore, modify passed-in WALEdit, i.e,
+ * ignore specified columns when writing, and add a KeyValue. On the other
+ * hand, it checks whether the ignored column is still in WAL when Restoreed
+ * at region reconstruct.
+ */
+public class SampleRegionWALObserver extends BaseRegionObserverCoprocessor
+implements WALObserver {
+
+ private static final Log LOG = LogFactory.getLog(SampleRegionWALObserver.class);
+
+ private byte[] tableName;
+ private byte[] row;
+ private byte[] ignoredFamily;
+ private byte[] ignoredQualifier;
+ private byte[] addedFamily;
+ private byte[] addedQualifier;
+ private byte[] changedFamily;
+ private byte[] changedQualifier;
+
+ private boolean preWALWriteCalled = false;
+ private boolean postWALWriteCalled = false;
+ private boolean preWALRestoreCalled = false;
+ private boolean postWALRestoreCalled = false;
+
+ /**
+ * Set values: with a table name, a column name which will be ignored, and
+ * a column name which will be added to WAL.
+ */
+ public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq,
+ byte[] chf, byte[] chq, byte[] addf, byte[] addq) {
+ this.row = row;
+ this.tableName = tableName;
+ this.ignoredFamily = igf;
+ this.ignoredQualifier = igq;
+ this.addedFamily = addf;
+ this.addedQualifier = addq;
+ this.changedFamily = chf;
+ this.changedQualifier = chq;
+ }
+
+
+ @Override
+ public void postWALWrite(CoprocessorEnvironment env, HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ postWALWriteCalled = true;
+ }
+
+ @Override
+ public boolean preWALWrite(CoprocessorEnvironment env, HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ boolean bypass = false;
+ // check table name matches or not.
+ if (!Arrays.equals(HRegionInfo.getTableName(info.getRegionName()), this.tableName)) {
+ return bypass;
+ }
+ preWALWriteCalled = true;
+ // here we're going to remove one keyvalue from the WALEdit, and add
+ // another one to it.
+ List<KeyValue> kvs = logEdit.getKeyValues();
+ KeyValue deletedKV = null;
+ for (KeyValue kv : kvs) {
+ // assume only one kv from the WALEdit matches.
+ byte[] family = kv.getFamily();
+ byte[] qulifier = kv.getQualifier();
+
+ if (Arrays.equals(family, ignoredFamily) &&
+ Arrays.equals(qulifier, ignoredQualifier)) {
+ LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
+ deletedKV = kv;
+ }
+ if (Arrays.equals(family, changedFamily) &&
+ Arrays.equals(qulifier, changedQualifier)) {
+ LOG.debug("Found the KeyValue from WALEdit which should be changed.");
+ kv.getBuffer()[kv.getValueOffset()] += 1;
+ }
+ }
+ kvs.add(new KeyValue(row, addedFamily, addedQualifier));
+ if (deletedKV != null) {
+ LOG.debug("About to delete a KeyValue from WALEdit.");
+ kvs.remove(deletedKV);
+ }
+ return bypass;
+ }
+
+ /**
+ * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
+ * Restoreed.
+ */
+ @Override
+ public void preWALRestore(RegionCoprocessorEnvironment env, HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ preWALRestoreCalled = true;
+ }
+
+ /**
+ * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
+ * Restoreed.
+ */
+ @Override
+ public void postWALRestore(RegionCoprocessorEnvironment env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+ postWALRestoreCalled = true;
+ }
+
+ public boolean isPreWALWriteCalled() {
+ return preWALWriteCalled;
+ }
+
+ public boolean isPostWALWriteCalled() {
+ return postWALWriteCalled;
+ }
+
+ public boolean isPreWALRestoreCalled() {
+ LOG.debug(SampleRegionWALObserver.class.getName() +
+ ".isPreWALRestoreCalled is called.");
+ return preWALRestoreCalled;
+ }
+
+ public boolean isPostWALRestoreCalled() {
+ LOG.debug(SampleRegionWALObserver.class.getName() +
+ ".isPostWALRestoreCalled is called.");
+ return postWALRestoreCalled;
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Mon Feb 7 23:01:28 2011
@@ -67,6 +67,8 @@ public class SimpleRegionObserver extend
boolean hadPostGetClosestRowBefore = false;
boolean hadPreIncrement = false;
boolean hadPostIncrement = false;
+ boolean hadPreWALRestored = false;
+ boolean hadPostWALRestored = false;
@Override
public void preOpen(RegionCoprocessorEnvironment e) {
@@ -333,4 +335,12 @@ public class SimpleRegionObserver extend
boolean hadPostIncrement() {
return hadPostIncrement;
}
+
+ boolean hadPreWALRestored() {
+ return hadPreWALRestored;
+ }
+
+ boolean hadPostWALRestored() {
+ return hadPostWALRestored;
+ }
}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java?rev=1068206&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java Mon Feb 7 23:01:28 2011
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver}
+ * interface hooks at all appropriate times during normal HMaster operations.
+ */
+public class TestWALCoprocessors {
+ private static final Log LOG = LogFactory.getLog(TestWALCoprocessors.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
+ private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
+ Bytes.toBytes("fam2"),
+ Bytes.toBytes("fam3"),
+ };
+ private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
+ Bytes.toBytes("q2"),
+ Bytes.toBytes("q3"),
+ };
+ private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
+ Bytes.toBytes("v2"),
+ Bytes.toBytes("v3"),
+ };
+ private static byte[] TEST_ROW = Bytes.toBytes("testRow");
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path dir;
+ private MiniDFSCluster cluster;
+ private Path hbaseRootDir;
+ private Path oldLogDir;
+ private Path logDir;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
+ conf.setBoolean("dfs.support.append", true);
+ conf.setInt("dfs.client.block.recovery.retries", 2);
+ conf.setInt("hbase.regionserver.flushlogentries", 1);
+
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
+ Path hbaseRootDir =
+ TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
+ LOG.info("hbase.rootdir=" + hbaseRootDir);
+ conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
+ }
+
+ @AfterClass
+ public static void teardownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ //this.cluster = TEST_UTIL.getDFSCluster();
+ this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ this.dir = new Path(this.hbaseRootDir, TestWALCoprocessors.class.getName());
+ this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
+
+ if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+ }
+
+ /**
+ * Test WAL write behavior with WALObserver. The coprocessor monitors
+ * a WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
+ * WALEdit.
+ */
+ @Test
+ public void testWWALCoprocessorWriteToWAL() throws Exception {
+ HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
+ Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
+ deleteDir(basedir);
+ fs.mkdirs(new Path(basedir, hri.getEncodedName()));
+
+ HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
+ SampleRegionWALObserver cp = getCoprocessor(log);
+
+ // TEST_FAMILY[0] shall be removed from WALEdit.
+ // TEST_FAMILY[1] value shall be changed.
+ // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
+ cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
+ TEST_FAMILY[1], TEST_QUALIFIER[1],
+ TEST_FAMILY[2], TEST_QUALIFIER[2]);
+
+ assertFalse(cp.isPreWALWriteCalled());
+ assertFalse(cp.isPostWALWriteCalled());
+
+ // TEST_FAMILY[2] is not in the put, however it shall be added by the tested
+ // coprocessor.
+ // Use a Put to create familyMap.
+ Put p = creatPutWith2Families(TEST_ROW);
+
+ Map<byte [], List<KeyValue>> familyMap = p.getFamilyMap();
+ WALEdit edit = new WALEdit();
+ addFamilyMapToWALEdit(familyMap, edit);
+
+ boolean foundFamily0 = false;
+ boolean foundFamily2 = false;
+ boolean modifiedFamily1 = false;
+
+ List<KeyValue> kvs = edit.getKeyValues();
+
+ for (KeyValue kv : kvs) {
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
+ foundFamily0 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
+ foundFamily2 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
+ if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
+ modifiedFamily1 = true;
+ }
+ }
+ }
+ assertTrue(foundFamily0);
+ assertFalse(foundFamily2);
+ assertFalse(modifiedFamily1);
+
+ // it's where WAL write cp should occur.
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ log.append(hri, hri.getTableDesc().getName(), edit, now);
+
+ // the edit shall have been change now by the coprocessor.
+ foundFamily0 = false;
+ foundFamily2 = false;
+ modifiedFamily1 = false;
+ for (KeyValue kv : kvs) {
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
+ foundFamily0 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
+ foundFamily2 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
+ if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
+ modifiedFamily1 = true;
+ }
+ }
+ }
+ assertFalse(foundFamily0);
+ assertTrue(foundFamily2);
+ assertTrue(modifiedFamily1);
+
+ assertTrue(cp.isPreWALWriteCalled());
+ assertTrue(cp.isPostWALWriteCalled());
+ }
+
+ /**
+ * Test WAL replay behavior with WALObserver.
+ */
+ @Test
+ public void testWALCoprocessorReplay() throws Exception {
+ // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
+ // ultimately called by HRegion::initialize()
+ byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay");
+
+ final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
+ final Path basedir = new Path(this.hbaseRootDir, Bytes.toString(tableName));
+ deleteDir(basedir);
+ fs.mkdirs(new Path(basedir, hri.getEncodedName()));
+
+ //HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
+ HLog wal = createWAL(this.conf);
+ //Put p = creatPutWith2Families(TEST_ROW);
+ WALEdit edit = new WALEdit();
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ //addFamilyMapToWALEdit(p.getFamilyMap(), edit);
+ final int countPerFamily = 1000;
+ for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+ addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
+ EnvironmentEdgeManager.getDelegate(), wal);
+ }
+ wal.append(hri, tableName, edit, now);
+ // sync to fs.
+ wal.sync();
+
+ final Configuration newConf = HBaseConfiguration.create(this.conf);
+ User user = HBaseTestingUtility.getDifferentUser(newConf,
+ ".replay.wal.secondtime");
+ user.runAs(new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ runWALSplit(newConf);
+ FileSystem newFS = FileSystem.get(newConf);
+ // Make a new wal for new region open.
+ HLog wal2 = createWAL(newConf);
+ HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
+ newConf, hri, TEST_UTIL.getHBaseCluster().getRegionServer(0));
+ long seqid2 = region2.initialize();
+
+ SampleRegionWALObserver cp2 =
+ (SampleRegionWALObserver)region2.getCoprocessorHost().findCoprocessor(
+ SampleRegionWALObserver.class.getName());
+ // TODO: asserting here is problematic.
+ assertNotNull(cp2);
+ assertTrue(cp2.isPreWALRestoreCalled());
+ assertTrue(cp2.isPostWALRestoreCalled());
+ region2.close();
+ wal2.closeAndDelete();
+ return null;
+ }
+ });
+ }
+ /**
+ * Test to see CP loaded successfully or not. There is a duplication
+ * at TestHLog, but the purpose of that one is to see whether the loaded
+ * CP will impact existing HLog tests or not.
+ */
+ @Test
+ public void testWALCoprocessorLoaded() throws Exception {
+ HLog log = new HLog(fs, dir, oldLogDir, conf);
+ assertNotNull(getCoprocessor(log));
+ }
+
+ private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
+ WALCoprocessorHost host = wal.getCoprocessorHost();
+ Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+ return (SampleRegionWALObserver)c;
+ }
+
+ /*
+ * Creates an HRI around an HTD that has <code>tableName</code> and three
+ * column families named.
+ * @param tableName Name of table to use when we create HTableDescriptor.
+ */
+ private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+
+ for (int i = 0; i < TEST_FAMILY.length; i++ ) {
+ HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
+ htd.addFamily(a);
+ }
+ return new HRegionInfo(htd, null, null, false);
+ }
+
+ /*
+ * @param p Directory to cleanup
+ */
+ private void deleteDir(final Path p) throws IOException {
+ if (this.fs.exists(p)) {
+ if (!this.fs.delete(p, true)) {
+ throw new IOException("Failed remove of " + p);
+ }
+ }
+ }
+
+ private Put creatPutWith2Families(byte[] row) throws IOException {
+ Put p = new Put(row);
+ for (int i = 0; i < TEST_FAMILY.length-1; i++ ) {
+ p.add(TEST_FAMILY[i], TEST_QUALIFIER[i],
+ TEST_VALUE[i]);
+ }
+ return p;
+ }
+
+ /**
+ * Copied from HRegion.
+ *
+ * @param familyMap map of family->edits
+ * @param walEdit the destination entry to append into
+ */
+ private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
+ WALEdit walEdit) {
+ for (List<KeyValue> edits : familyMap.values()) {
+ for (KeyValue kv : edits) {
+ walEdit.add(kv);
+ }
+ }
+ }
+ private Path runWALSplit(final Configuration c) throws IOException {
+ FileSystem fs = FileSystem.get(c);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
+ this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
+ List<Path> splits = logSplitter.splitLog();
+ // Split should generate only 1 file since there's only 1 region
+ assertEquals(1, splits.size());
+ // Make sure the file exists
+ assertTrue(fs.exists(splits.get(0)));
+ LOG.info("Split file=" + splits.get(0));
+ return splits.get(0);
+ }
+ private HLog createWAL(final Configuration c) throws IOException {
+ HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
+ return wal;
+ }
+ private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
+ final byte [] rowName, final byte [] family,
+ final int count, EnvironmentEdge ee, final HLog wal)
+ throws IOException {
+ String familyStr = Bytes.toString(family);
+ for (int j = 0; j < count; j++) {
+ byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
+ byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, qualifierBytes,
+ ee.currentTimeMillis(), columnBytes));
+ wal.append(hri, tableName, edit, ee.currentTimeMillis());
+ }
+ }
+}
+
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1068206&r1=1068205&r2=1068206&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Feb 7 23:01:28 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -45,6 +46,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.coprocessor.Coprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
@@ -107,6 +111,8 @@ public class TestHLog {
.setInt("ipc.client.connect.max.retries", 1);
TEST_UTIL.getConfiguration().setInt(
"dfs.client.block.recovery.retries", 1);
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
TEST_UTIL.startMiniCluster(3);
conf = TEST_UTIL.getConfiguration();
@@ -640,6 +646,18 @@ public class TestHLog {
assertEquals(0, log.getNumLogFiles());
}
+ /**
+ * A loaded WAL coprocessor won't break existing HLog test cases.
+ */
+ @Test
+ public void testWALCoprocessorLoaded() throws Exception {
+ // test to see whether the coprocessor is loaded or not.
+ HLog log = new HLog(fs, dir, oldLogDir, conf);
+ WALCoprocessorHost host = log.getCoprocessorHost();
+ Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+ assertNotNull(c);
+ }
+
private void addEdits(HLog log, HRegionInfo hri, byte [] tableName,
int times) throws IOException {
final byte [] row = Bytes.toBytes("row");