You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/05/04 18:12:33 UTC
svn commit: r1592368 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/
hbase-server/src/main/java/org/apache/hadoop/hbase/master/
hbase-server/src/main/java/org...
Author: tedyu
Date: Sun May 4 16:12:32 2014
New Revision: 1592368
URL: http://svn.apache.org/r1592368
Log:
HBASE-10926 Use global procedure to flush table memstore cache (Jerry He)
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Sun May 4 16:12:32 2014
@@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1467,21 +1468,12 @@ public class HBaseAdmin implements Admin
} else {
final TableName tableName = checkTableExists(
TableName.valueOf(tableNameOrRegionName), ct);
- List<Pair<HRegionInfo, ServerName>> pairs =
- MetaReader.getTableRegionsAndLocations(ct,
- tableName);
- for (Pair<HRegionInfo, ServerName> pair: pairs) {
- if (pair.getFirst().isOffline()) continue;
- if (pair.getSecond() == null) continue;
- try {
- flush(pair.getSecond(), pair.getFirst());
- } catch (NotServingRegionException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to flush " + pair.getFirst() + ": " +
- StringUtils.stringifyException(e));
- }
- }
+ if (isTableDisabled(tableName)) {
+ LOG.info("Table is disabled: " + tableName.getNameAsString());
+ return;
}
+ execProcedure("flush-table-proc", tableName.getNameAsString(),
+ new HashMap<String, String>());
}
} finally {
cleanupCatalogTracker(ct);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java Sun May 4 16:12:32 2014
@@ -387,4 +387,14 @@ public class BaseMasterObserver implemen
List<HTableDescriptor> descriptors) throws IOException {
}
+ @Override
+ public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ }
+
+ @Override
+ public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ }
+
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java Sun May 4 16:12:32 2014
@@ -694,4 +694,22 @@ public interface MasterObserver extends
*/
void postModifyNamespace(final ObserverContext<MasterCoprocessorEnvironment> ctx,
NamespaceDescriptor ns) throws IOException;
+
+ /**
+ * Called before the table memstore is flushed to disk.
+ * @param ctx the environment to interact with the framework and master
+ * @param tableName the name of the table
+ * @throws IOException
+ */
+ void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName) throws IOException;
+
+ /**
+ * Called after the table memstore is flushed to disk.
+ * @param ctx the environment to interact with the framework and master
+ * @param tableName the name of the table
+ * @throws IOException
+ */
+ void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName) throws IOException;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sun May 4 16:12:32 2014
@@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
+import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
@@ -428,6 +429,7 @@ public class HMaster extends HRegionServ
this.snapshotManager = new SnapshotManager();
this.mpmHost = new MasterProcedureManagerHost();
this.mpmHost.register(this.snapshotManager);
+ this.mpmHost.register(new MasterFlushTableProcedureManager());
this.mpmHost.loadProcedures(conf);
this.mpmHost.initialize(this, this.metricsMaster);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Sun May 4 16:12:32 2014
@@ -1568,4 +1568,38 @@ public class MasterCoprocessorHost
}
}
+ public void preTableFlush(final TableName tableName) throws IOException {
+ ObserverContext<MasterCoprocessorEnvironment> ctx = null;
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ ((MasterObserver)env.getInstance()).preTableFlush(ctx, tableName);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
+
+ public void postTableFlush(final TableName tableName) throws IOException {
+ ObserverContext<MasterCoprocessorEnvironment> ctx = null;
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ ((MasterObserver)env.getInstance()).postTableFlush(ctx, tableName);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
+
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java Sun May 4 16:12:32 2014
@@ -342,6 +342,16 @@ public class Procedure implements Callab
}
/**
+ * Check if the entire procedure has globally completed, or has been aborted.
+ * @throws ForeignException
+ */
+ public boolean isCompleted() throws ForeignException {
+ // Rethrow exception if any
+ monitor.rethrowException();
+ return (completedLatch.getCount() == 0);
+ }
+
+ /**
* A callback that handles incoming ForeignExceptions.
*/
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java Sun May 4 16:12:32 2014
@@ -149,12 +149,21 @@ public class ProcedureCoordinator {
Procedure oldProc = procedures.get(procName);
if (oldProc != null) {
// procedures are always eventually completed on both successful and failed execution
- if (oldProc.completedLatch.getCount() != 0) {
- LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
- return false;
+ try {
+ if (!oldProc.isCompleted()) {
+ LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
+ return false;
+ }
+ else {
+ LOG.debug("Procedure " + procName
+ + " was in running list but was completed. Accepting new attempt.");
+ procedures.remove(procName);
+ }
+ } catch (ForeignException e) {
+ LOG.debug("Procedure " + procName
+ + " was in running list but has exception. Accepting new attempt.");
+ procedures.remove(procName);
}
- LOG.debug("Procedure " + procName + " was in running list but was completed. Accepting new attempt.");
- procedures.remove(procName);
}
}
@@ -233,14 +242,19 @@ public class ProcedureCoordinator {
/**
* Kick off the named procedure
+ * Currently only one procedure with the same type and name is allowed to run at a time.
* @param procName name of the procedure to start
* @param procArgs arguments for the procedure
* @param expectedMembers expected members to start
- * @return handle to the running procedure, if it was started correctly, <tt>null</tt> otherwise
- * @throws RejectedExecutionException if there are no more available threads to run the procedure
+ * @return handle to the running procedure, if it was started correctly,
+ * <tt>null</tt> otherwise.
+ * Null could be due to submitting a procedure multiple times
+ * (or one with the same name), or runtime exception.
+ * Check the procedure's monitor that holds a reference to the exception
+ * that caused the failure.
*/
public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
- List<String> expectedMembers) throws RejectedExecutionException {
+ List<String> expectedMembers) {
Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
@@ -303,4 +317,4 @@ public class ProcedureCoordinator {
public Set<String> getProcedureNames() {
return new HashSet<String>(procedures.keySet());
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java Sun May 4 16:12:32 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
@@ -70,6 +71,8 @@ public class RegionServerProcedureManage
loadUserProcedures(conf, REGIONSERVER_PROCEDURE_CONF_KEY);
// load the default snapshot manager
procedures.add(new RegionServerSnapshotManager());
+ // load the default flush region procedure manager
+ procedures.add(new RegionServerFlushTableProcedureManager());
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java Sun May 4 16:12:32 2014
@@ -160,6 +160,7 @@ abstract public class Subprocedure imple
LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
acquireBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
+ rethrowException();
// vote yes to coordinator about being prepared
rpcs.sendMemberAcquired(this);
@@ -180,6 +181,7 @@ abstract public class Subprocedure imple
LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
insideBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
+ rethrowException();
// Ack that the member has executed and released local barrier
rpcs.sendMemberCompleted(this);
@@ -327,4 +329,4 @@ abstract public class Subprocedure imple
@Override
public void cleanup(Exception e) {}
};
-}
\ No newline at end of file
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java?rev=1592368&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java Sun May 4 16:12:32 2014
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure.flush;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * This flush region implementation uses the distributed procedure framework to flush
+ * table regions.
+ * Its acquireBarrier stage does nothing. Its insideBarrier stage flushes the regions.
+ */
+@InterfaceAudience.Private
+public class FlushTableSubprocedure extends Subprocedure {
+ private static final Log LOG = LogFactory.getLog(FlushTableSubprocedure.class);
+
+ private final String table;
+ private final List<HRegion> regions;
+ private final FlushTableSubprocedurePool taskManager;
+
+ public FlushTableSubprocedure(ProcedureMember member,
+ ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
+ List<HRegion> regions, String table,
+ FlushTableSubprocedurePool taskManager) {
+ super(member, table, errorListener, wakeFrequency, timeout);
+ this.table = table;
+ this.regions = regions;
+ this.taskManager = taskManager;
+ }
+
+ private class RegionFlushTask implements Callable<Void> {
+ HRegion region;
+ RegionFlushTask(HRegion region) {
+ this.region = region;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ LOG.debug("Starting region operation on " + region);
+ region.startRegionOperation();
+ try {
+ LOG.debug("Flush region " + region.toString() + " started...");
+ region.flushcache();
+ } finally {
+ LOG.debug("Closing region operation on " + region);
+ region.closeRegionOperation();
+ }
+ return null;
+ }
+ }
+
+ private void flushRegions() throws ForeignException {
+ if (regions.isEmpty()) {
+ // No regions on this RS, we are basically done.
+ return;
+ }
+
+ monitor.rethrowException();
+
+ // assert that the taskManager is empty.
+ if (taskManager.hasTasks()) {
+ throw new IllegalStateException("Attempting to flush "
+ + table + " but we currently have outstanding tasks");
+ }
+
+ // Add all hfiles already existing in region.
+ for (HRegion region : regions) {
+ // submit one task per region for parallelize by region.
+ taskManager.submitTask(new RegionFlushTask(region));
+ monitor.rethrowException();
+ }
+
+ // wait for everything to complete.
+ LOG.debug("Flush region tasks submitted for " + regions.size() + " regions");
+ try {
+ taskManager.waitForOutstandingTasks();
+ } catch (InterruptedException e) {
+ throw new ForeignException(getMemberName(), e);
+ }
+ }
+
+ /**
+ * Flush the online regions on this rs for the target table.
+ */
+ @Override
+ public void acquireBarrier() throws ForeignException {
+ flushRegions();
+ }
+
+ @Override
+ public void insideBarrier() throws ForeignException {
+ // No-Op
+ }
+
+ /**
+ * Cancel threads if they haven't finished.
+ */
+ @Override
+ public void cleanup(Exception e) {
+ LOG.info("Aborting all flush region subprocedure task threads for '"
+ + table + "' due to error", e);
+ try {
+ taskManager.cancelTasks();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void releaseBarrier() {
+ // NO OP
+ }
+
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java?rev=1592368&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java Sun May 4 16:12:32 2014
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure.flush;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.collect.Lists;
+
+public class MasterFlushTableProcedureManager extends MasterProcedureManager {
+
+ public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
+
+ private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis";
+ private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
+ private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis";
+ private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500;
+
+ private static final String FLUSH_PROC_POOL_THREADS_KEY =
+ "hbase.flush.procedure.master.threads";
+ private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1;
+
+ private static final Log LOG = LogFactory.getLog(MasterFlushTableProcedureManager.class);
+
+ private MasterServices master;
+ private ProcedureCoordinator coordinator;
+ private Map<TableName, Procedure> procMap = new HashMap<TableName, Procedure>();
+ private boolean stopped;
+
+ public MasterFlushTableProcedureManager() {};
+
+ @Override
+ public void stop(String why) {
+ LOG.info("stop: " + why);
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ @Override
+ public void initialize(MasterServices master, MetricsMaster metricsMaster)
+ throws KeeperException, IOException, UnsupportedOperationException {
+ this.master = master;
+
+ // get the configuration for the coordinator
+ Configuration conf = master.getConfiguration();
+ long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
+ long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
+
+ // setup the procedure coordinator
+ String name = master.getServerName().toString();
+ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
+ ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+ master.getZooKeeper(), getProcedureSignature(), name);
+
+ this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return FLUSH_TABLE_PROCEDURE_SIGNATURE;
+ }
+
+ @Override
+ public void execProcedure(ProcedureDescription desc) throws IOException {
+
+ TableName tableName = TableName.valueOf(desc.getInstance());
+
+ // call pre coproc hook
+ MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preTableFlush(tableName);
+ }
+
+ // Get the list of region servers that host the online regions for table.
+ // We use the procedure instance name to carry the table name from the client.
+ // It is possible that regions may move after we get the region server list.
+ // Each region server will get its own online regions for the table.
+ // We may still miss regions that need to be flushed.
+ List<Pair<HRegionInfo, ServerName>> regionsAndLocations = null;
+ try {
+ regionsAndLocations =
+ MetaReader.getTableRegionsAndLocations(this.master.getCatalogTracker(),
+ TableName.valueOf(desc.getInstance()), false);
+ } catch (InterruptedException e1) {
+ String msg = "Failed to get regions for '" + desc.getInstance() + "'";
+ LOG.error(msg);
+ throw new IOException(msg, e1);
+
+ }
+ Set<String> regionServers = new HashSet<String>(regionsAndLocations.size());
+ for (Pair<HRegionInfo, ServerName> region : regionsAndLocations) {
+ if (region != null && region.getFirst() != null && region.getSecond() != null) {
+ HRegionInfo hri = region.getFirst();
+ if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
+ regionServers.add(region.getSecond().toString());
+ }
+ }
+
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+
+ // Kick of the global procedure from the master coordinator to the region servers.
+ // We rely on the existing Distributed Procedure framework to prevent any concurrent
+ // procedure with the same name.
+ Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
+ new byte[0], Lists.newArrayList(regionServers));
+ monitor.rethrowException();
+ if (proc == null) {
+ String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
+ + desc.getInstance() + "'. " + "Another flush procedure is running?";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ procMap.put(tableName, proc);
+
+ try {
+ // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+ // if it takes too long.
+ proc.waitForCompleted();
+ LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '"
+ + desc.getInstance() + "'");
+ LOG.info("Master flush table procedure is successful!");
+ } catch (InterruptedException e) {
+ ForeignException ee =
+ new ForeignException("Interrupted while waiting for flush table procdure to finish", e);
+ monitor.receive(ee);
+ Thread.currentThread().interrupt();
+ } catch (ForeignException e) {
+ ForeignException ee =
+ new ForeignException("Exception while waiting for flush table procdure to finish", e);
+ monitor.receive(ee);
+ }
+ monitor.rethrowException();
+ }
+
+ @Override
+ public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+ // Procedure instance name is the table name.
+ TableName tableName = TableName.valueOf(desc.getInstance());
+ Procedure proc = procMap.get(tableName);
+ if (proc == null) {
+ // The procedure has not even been started yet.
+ // The client would request the procedure and call isProcedureDone().
+ // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone().
+ return false;
+ }
+ // We reply on the existing Distributed Procedure framework to give us the status.
+ return proc.isCompleted();
+ }
+
+}
\ No newline at end of file
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java?rev=1592368&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java Sun May 4 16:12:32 2014
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure.flush;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
+ */
+@InterfaceAudience.Private
+public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
+ private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class);
+
+ private static final String CONCURENT_FLUSH_TASKS_KEY =
+ "hbase.flush.procedure.region.concurrentTasks";
+ private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
+
+ public static final String FLUSH_REQUEST_THREADS_KEY =
+ "hbase.flush.procedure.region.pool.threads";
+ public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
+
+ public static final String FLUSH_TIMEOUT_MILLIS_KEY =
+ "hbase.flush.procedure.region.timeout";
+ public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+ public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
+ "hbase.flush.procedure.region.wakefrequency";
+ private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
+
+ /**
+ * Exposed for testing.
+ * @param conf HBase configuration.
+ * @param server region server.
+ * @param memberRpc use specified memberRpc instance
+ * @param procMember use specified ProcedureMember
+ */
+ RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
+ ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
+ this.rss = server;
+ this.memberRpcs = memberRpc;
+ this.member = procMember;
+ }
+
+ public RegionServerFlushTableProcedureManager() {}
+
+ /**
+ * Start accepting flush table requests.
+ */
+ @Override
+ public void start() {
+ LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
+ this.memberRpcs.start(rss.getServerName().toString(), member);
+ }
+
+ /**
+ * Close <tt>this</tt> and all running tasks
+ * @param force forcefully stop all running tasks
+ * @throws IOException
+ */
+ @Override
+ public void stop(boolean force) throws IOException {
+ String mode = force ? "abruptly" : "gracefully";
+ LOG.info("Stopping region server flush procedure manager " + mode + ".");
+
+ try {
+ this.member.close();
+ } finally {
+ this.memberRpcs.close();
+ }
+ }
+
+ /**
+ * If in a running state, creates the specified subprocedure to flush table regions.
+ *
+ * Because this gets the local list of regions to flush and not the set the master had,
+ * there is a possibility of a race where regions may be missed.
+ *
+ * @param table
+ * @return Subprocedure to submit to the ProcedureMemeber.
+ */
+ public Subprocedure buildSubprocedure(String table) {
+
+ // don't run the subprocedure if the parent is stop(ping)
+ if (rss.isStopping() || rss.isStopped()) {
+ throw new IllegalStateException("Can't start flush region subprocedure on RS: "
+ + rss.getServerName() + ", because stopping/stopped!");
+ }
+
+ // check to see if this server is hosting any regions for the table
+ List<HRegion> involvedRegions;
+ try {
+ involvedRegions = getRegionsToFlush(table);
+ } catch (IOException e1) {
+ throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
+ }
+
+ // We need to run the subprocedure even if we have no relevant regions. The coordinator
+ // expects participation in the procedure and without sending message the master procedure
+ // will hang and fail.
+
+ LOG.debug("Launching subprocedure to flush regions for " + table);
+ ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
+ Configuration conf = rss.getConfiguration();
+ long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY,
+ FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY,
+ FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
+
+ FlushTableSubprocedurePool taskManager =
+ new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
+ return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
+ timeoutMillis, involvedRegions, table, taskManager);
+ }
+
+ /**
+ * Get the list of regions to flush for the table on this server
+ *
+ * It is possible that if a region moves somewhere between the calls
+ * we'll miss the region.
+ *
+ * @param table
+ * @return the list of online regions. Empty list is returned if no regions.
+ * @throws IOException
+ */
+ private List<HRegion> getRegionsToFlush(String table) throws IOException {
+ return rss.getOnlineRegions(TableName.valueOf(table));
+ }
+
+ public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
+
+ @Override
+ public Subprocedure buildSubprocedure(String name, byte[] data) {
+ // The name of the procedure instance from the master is the table name.
+ return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
+ }
+
+ }
+
+ /**
+ * We use the FlushTableSubprocedurePool, a class specific thread pool instead of
+ * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
+ *
+ * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
+ * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
+ * failures.
+ */
+ static class FlushTableSubprocedurePool {
+ private final ExecutorCompletionService<Void> taskPool;
+ private final ThreadPoolExecutor executor;
+ private volatile boolean stopped;
+ private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ private final String name;
+
+ FlushTableSubprocedurePool(String name, Configuration conf) {
+ // configure the executor service
+ long keepAlive = conf.getLong(
+ RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
+ RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
+ this.name = name;
+ executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
+ + name + ")-flush-proc-pool"));
+ taskPool = new ExecutorCompletionService<Void>(executor);
+ }
+
+ boolean hasTasks() {
+ return futures.size() != 0;
+ }
+
+ /**
+ * Submit a task to the pool.
+ *
+ * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}.
+ */
+ void submitTask(final Callable<Void> task) {
+ Future<Void> f = this.taskPool.submit(task);
+ futures.add(f);
+ }
+
+ /**
+ * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
+ * This *must* be called after all tasks are submitted via submitTask.
+ *
+ * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+ * @throws InterruptedException
+ */
+ boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
+ LOG.debug("Waiting for local region flush to finish.");
+
+ int sz = futures.size();
+ try {
+ // Using the completion service to process the futures.
+ for (int i = 0; i < sz; i++) {
+ Future<Void> f = taskPool.take();
+ f.get();
+ if (!futures.remove(f)) {
+ LOG.warn("unexpected future" + f);
+ }
+ LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks.");
+ }
+ LOG.debug("Completed " + sz + " local region flush tasks.");
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
+ if (!stopped) {
+ Thread.currentThread().interrupt();
+ throw new ForeignException("FlushSubprocedurePool", e);
+ }
+ // we are stopped so we can just exit.
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ForeignException) {
+ LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
+ throw (ForeignException)e.getCause();
+ }
+ LOG.warn("Got Exception in FlushSubprocedurePool", e);
+ throw new ForeignException(name, e.getCause());
+ } finally {
+ cancelTasks();
+ }
+ return false;
+ }
+
+ /**
+ * This attempts to cancel out all pending and in progress tasks (interruptions issues)
+ * @throws InterruptedException
+ */
+ void cancelTasks() throws InterruptedException {
+ Collection<Future<Void>> tasks = futures;
+ LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
+ for (Future<Void> f: tasks) {
+ f.cancel(false);
+ }
+
+ // evict remaining tasks and futures from taskPool.
+ while (!futures.isEmpty()) {
+ // block to remove cancelled futures;
+ LOG.warn("Removing cancelled elements from taskPool");
+ futures.remove(taskPool.take());
+ }
+ stop();
+ }
+
+ /**
+ * Abruptly shutdown the thread pool. Call when exiting a region server.
+ */
+ void stop() {
+ if (this.stopped) return;
+
+ this.stopped = true;
+ this.executor.shutdownNow();
+ }
+ }
+
+ /**
+ * Initialize this region server flush procedure manager
+ * Uses a zookeeper based member controller.
+ * @param rss region server
+ * @throws KeeperException if the zookeeper cannot be reached
+ */
+ @Override
+ public void initialize(RegionServerServices rss) throws KeeperException {
+ this.rss = rss;
+ ZooKeeperWatcher zkw = rss.getZooKeeper();
+ this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
+ MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
+
+ Configuration conf = rss.getConfiguration();
+ long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
+
+ // create the actual flush table procedure member
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
+ opThreads, keepAlive);
+ this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
+ }
+
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Sun May 4 16:12:32 2014
@@ -1167,6 +1167,17 @@ public class AccessController extends Ba
NamespaceDescriptor ns) throws IOException {
}
+ @Override
+ public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName) throws IOException {
+ requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
+ }
+
+ @Override
+ public void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName) throws IOException {
+ }
+
/* ---- RegionObserver implementation ---- */
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java Sun May 4 16:12:32 2014
@@ -556,6 +556,16 @@ public class VisibilityController extend
}
@Override
+ public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ }
+
+ @Override
+ public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ }
+
+ @Override
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java?rev=1592368&r1=1592367&r2=1592368&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java Sun May 4 16:12:32 2014
@@ -941,6 +941,16 @@ public class TestMasterObserver {
public boolean wasGetTableDescriptorsCalled() {
return preGetTableDescriptorsCalled && postGetTableDescriptorsCalled;
}
+
+ @Override
+ public void preTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ }
+
+ @Override
+ public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ }
}
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();