You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/06/15 20:43:56 UTC
[1/2] HBASE-11094: Distributed log replay is incompatible for rolling
restarts
Repository: hbase
Updated Branches:
refs/heads/0.98 9ce175146 -> 34ae4a94d
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
index 4ac800e..9bfdeed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -55,11 +56,12 @@ public class HLogSplitterHandler extends EventHandler {
private final AtomicInteger inProgressTasks;
private final MutableInt curTaskZKVersion;
private final TaskExecutor splitTaskExecutor;
+ private final RecoveryMode mode;
public HLogSplitterHandler(final Server server, String curTask,
final MutableInt curTaskZKVersion,
CancelableProgressable reporter,
- AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
+ AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
super(server, EventType.RS_LOG_REPLAY);
this.curTask = curTask;
this.wal = ZKSplitLog.getFileName(curTask);
@@ -70,16 +72,17 @@ public class HLogSplitterHandler extends EventHandler {
this.zkw = server.getZooKeeper();
this.curTaskZKVersion = curTaskZKVersion;
this.splitTaskExecutor = splitTaskExecutor;
+ this.mode = mode;
}
@Override
public void process() throws IOException {
long startTime = System.currentTimeMillis();
try {
- Status status = this.splitTaskExecutor.exec(wal, reporter);
+ Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
switch (status) {
case DONE:
- endTask(zkw, new SplitLogTask.Done(this.serverName),
+ endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
break;
case PREEMPTED:
@@ -88,7 +91,7 @@ public class HLogSplitterHandler extends EventHandler {
break;
case ERR:
if (server != null && !server.isStopped()) {
- endTask(zkw, new SplitLogTask.Err(this.serverName),
+ endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
break;
}
@@ -99,7 +102,7 @@ public class HLogSplitterHandler extends EventHandler {
if (server != null && server.isStopped()) {
LOG.info("task execution interrupted because worker is exiting " + curTask);
}
- endTask(zkw, new SplitLogTask.Resigned(this.serverName),
+ endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
break;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index d709b8b..545c253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
@@ -167,7 +168,7 @@ public class HLogSplitter {
private final int minBatchSize;
HLogSplitter(Configuration conf, Path rootDir,
- FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
+ FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@@ -184,7 +185,7 @@ public class HLogSplitter {
// a larger minBatchSize may slow down recovery because replay writer has to wait for
// enough edits before replaying them
this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
- this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
+ this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && this.distributedLogReplay) {
@@ -218,8 +219,8 @@ public class HLogSplitter {
*/
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
- ZooKeeperWatcher zkw) throws IOException {
- HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
+ ZooKeeperWatcher zkw, RecoveryMode mode) throws IOException {
+ HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, mode);
return s.splitLogFile(logfile, reporter);
}
@@ -233,7 +234,8 @@ public class HLogSplitter {
List<Path> splits = new ArrayList<Path>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
- HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
+ HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null,
+ RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@@ -1972,14 +1974,4 @@ public class HLogSplitter {
return mutations;
}
-
- /**
- * Returns if distributed log replay is turned on or not
- * @param conf
- * @return true when distributed log replay is turned on
- */
- public static boolean isDistributedLogReplay(Configuration conf) {
- return conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
- HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
index d7082ed..b46ec28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.DataInputBuffer;
@@ -120,7 +121,8 @@ public class TestSerialization {
@Test
public void testSplitLogTask() throws DeserializationException {
- SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"));
+ SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
+ RecoveryMode.LOG_REPLAY);
byte [] bytes = slt.toByteArray();
SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
assertTrue(slt.equals(sltDeserialized));
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 8cdaef6..6595333 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -69,7 +69,7 @@ public class TestMultiParallel {
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys();
- private static final int slaves = 2; // also used for testing HTable pool size
+ private static final int slaves = 5; // also used for testing HTable pool size
@BeforeClass public static void beforeClass() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -696,4 +696,4 @@ public class TestMultiParallel {
validateEmpty(result);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 8ba8012..1ba8eac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -638,10 +639,14 @@ public class TestAssignmentManager {
DeadServer deadServers = new DeadServer();
deadServers.add(SERVERNAME_A);
// I need a services instance that will return the AM
+ MasterFileSystem fs = Mockito.mock(MasterFileSystem.class);
+ Mockito.doNothing().when(fs).setLogRecoveryMode();
+ Mockito.when(fs.getLogRecoveryMode()).thenReturn(RecoveryMode.LOG_REPLAY);
MasterServices services = Mockito.mock(MasterServices.class);
Mockito.when(services.getAssignmentManager()).thenReturn(am);
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
+ Mockito.when(services.getMasterFileSystem()).thenReturn(fs);
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
services, deadServers, SERVERNAME_A, false);
am.failoverCleanupDone.set(true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 72bd0d1..c5f869d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -159,6 +159,7 @@ public class TestDistributedLogSplitting {
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
conf.setInt("hbase.regionserver.wal.max.splitters", 3);
+ conf.setInt("hfile.format.version", 3);
TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.setDFSCluster(dfsCluster);
TEST_UTIL.setZkCluster(zkCluster);
@@ -1169,7 +1170,6 @@ public class TestDistributedLogSplitting {
LOG.info("testSameVersionUpdatesRecovery");
conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
- conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40;
@@ -1261,7 +1261,6 @@ public class TestDistributedLogSplitting {
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
conf.setInt("hbase.hstore.compactionThreshold", 3);
- conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40;
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
index 2fbb849..8f57ee4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
@@ -94,8 +94,8 @@ public class TestMasterFileSystem {
// Create a ZKW to use in the test
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath),
- new SplitLogTask.Owned(inRecoveryServerName).toByteArray(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Owned(inRecoveryServerName, fs.getLogRecoveryMode()).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, staleRegion);
ZKUtil.createWithParents(zkw, staleRegionPath);
String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index ed51484..4988742 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -48,16 +48,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -90,6 +95,7 @@ public class TestSplitLogManager {
private SplitLogManager slm;
private Configuration conf;
private int to;
+ private RecoveryMode mode;
private static HBaseTestingUtility TEST_UTIL;
@@ -133,7 +139,11 @@ public class TestSplitLogManager {
conf.setInt("hbase.splitlog.manager.timeout", to);
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ conf.setInt("hfile.format.version", 3);
to = to + 4 * 100;
+
+ this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@@ -213,7 +223,7 @@ public class TestSplitLogManager {
LOG.info("TestOrphanTaskAcquisition");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
- SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER);
+ SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -238,7 +248,7 @@ public class TestSplitLogManager {
" startup");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
//create an unassigned orphan task
- SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER);
+ SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode);
@@ -274,19 +284,19 @@ public class TestSplitLogManager {
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
final ServerName worker2 = ServerName.valueOf("worker2,1,1");
final ServerName worker3 = ServerName.valueOf("worker3,1,1");
- SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
- slt = new SplitLogTask.Owned(worker2);
+ slt = new SplitLogTask.Owned(worker2, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
int version2 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version2 > version1);
- slt = new SplitLogTask.Owned(worker3);
+ slt = new SplitLogTask.Owned(worker3, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
@@ -304,7 +314,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
waitForCounter(new Expr() {
@@ -331,7 +341,7 @@ public class TestSplitLogManager {
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- SplitLogTask slt = new SplitLogTask.Done(worker1);
+ SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
synchronized (batch) {
while (batch.installed != batch.done) {
@@ -352,7 +362,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- SplitLogTask slt = new SplitLogTask.Err(worker1);
+ SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
synchronized (batch) {
@@ -376,7 +386,7 @@ public class TestSplitLogManager {
assertEquals(tot_mgr_resubmit.get(), 0);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
assertEquals(tot_mgr_resubmit.get(), 0);
- SplitLogTask slt = new SplitLogTask.Resigned(worker1);
+ SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
assertEquals(tot_mgr_resubmit.get(), 0);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
int version = ZKUtil.checkExists(zkw, tasknode);
@@ -399,7 +409,7 @@ public class TestSplitLogManager {
// create an orphan task in OWNED state
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -414,7 +424,7 @@ public class TestSplitLogManager {
for (int i = 0; i < (3 * to)/100; i++) {
Thread.sleep(100);
final ServerName worker2 = ServerName.valueOf("worker1,1,1");
- slt = new SplitLogTask.Owned(worker2);
+ slt = new SplitLogTask.Owned(worker2, this.mode);
ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
}
@@ -438,7 +448,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
slm.handleDeadWorker(worker1);
@@ -463,7 +473,7 @@ public class TestSplitLogManager {
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
- SplitLogTask slt = new SplitLogTask.Owned(worker1);
+ SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
@@ -513,4 +523,25 @@ public class TestSplitLogManager {
assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
}
+
+ @Test(timeout=60000)
+ public void testGetPreviousRecoveryMode() throws Exception {
+ LOG.info("testGetPreviousRecoveryMode");
+ SplitLogCounters.resetCounters();
+ Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
+ new SplitLogTask.Unassigned(
+ ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
+ assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
+
+ zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
+ slm.setRecoveryMode(false);
+ assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 42e9e8f..8ffc719 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -108,7 +108,8 @@ public class TestRegionServerNoMaster {
// We reopen. We need a ZK node here, as a open is always triggered by a master.
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
// first version is '0'
- AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+ AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+ getRS().getServerName(), hri, 0, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
@@ -227,7 +228,8 @@ public class TestRegionServerNoMaster {
// We're sending multiple requests in a row. The region server must handle this nicely.
for (int i = 0; i < 10; i++) {
- AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+ AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+ getRS().getServerName(), hri, 0, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
@@ -248,7 +250,8 @@ public class TestRegionServerNoMaster {
try {
// fake region to be closing now, need to clear state afterwards
getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), Boolean.FALSE);
- AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+ AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+ getRS().getServerName(), hri, 0, null, null);
getRS().openRegion(null, orr);
Assert.fail("The closing region should not be opened");
} catch (ServiceException se) {
@@ -403,7 +406,8 @@ public class TestRegionServerNoMaster {
//actual close
closeNoZK();
try {
- AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(earlierServerName, hri, 0, null);
+ AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
+ earlierServerName, hri, 0, null, null);
getRS().openRegion(null, orr);
Assert.fail("The openRegion should have been rejected");
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index eaf5547..dcb1e88 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
@@ -38,6 +40,8 @@ import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -64,6 +68,7 @@ public class TestSplitLogWorker {
private ZooKeeperWatcher zkw;
private SplitLogWorker slw;
private ExecutorService executorService;
+ private RecoveryMode mode;
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
throws Exception {
@@ -98,6 +103,7 @@ public class TestSplitLogWorker {
@Before
public void setup() throws Exception {
TEST_UTIL.startMiniZKCluster();
+ Configuration conf = TEST_UTIL.getConfiguration();
zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"split-log-worker-tests", null);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
@@ -112,6 +118,8 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
executorService = new ExecutorService("TestSplitLogWorker");
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
+ this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@@ -126,7 +134,7 @@ public class TestSplitLogWorker {
new SplitLogWorker.TaskExecutor() {
@Override
- public Status exec(String name, CancelableProgressable p) {
+ public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
while (true) {
try {
Thread.sleep(1000);
@@ -149,7 +157,8 @@ public class TestSplitLogWorker {
final ServerName RS = ServerName.valueOf("rs,1,1");
RegionServerServices mockedRS = getRegionServer(RS);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
SplitLogWorker slw =
@@ -184,8 +193,8 @@ public class TestSplitLogWorker {
final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
- new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
RegionServerServices mockedRS1 = getRegionServer(SVR1);
RegionServerServices mockedRS2 = getRegionServer(SVR2);
SplitLogWorker slw1 =
@@ -227,15 +236,15 @@ public class TestSplitLogWorker {
// this time create a task node after starting the splitLogWorker
zkw.getRecoverableZooKeeper().create(PATH,
- new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
assertEquals(1, slw.taskReadySeq);
byte [] bytes = ZKUtil.getData(zkw, PATH);
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV));
- slt = new SplitLogTask.Owned(MANAGER);
+ slt = new SplitLogTask.Owned(MANAGER, this.mode);
ZKUtil.setData(zkw, PATH, slt.toByteArray());
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
} finally {
@@ -258,7 +267,8 @@ public class TestSplitLogWorker {
Thread.sleep(100);
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
- SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
+ SplitLogTask unassignedManager =
+ new SplitLogTask.Unassigned(MANAGER, this.mode);
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -272,7 +282,7 @@ public class TestSplitLogWorker {
// preempt the first task, have it owned by another worker
final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
- SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
+ SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
ZKUtil.setData(zkw, PATH1, slt.toByteArray());
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
@@ -298,7 +308,7 @@ public class TestSplitLogWorker {
Thread.sleep(100);
String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
- SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
+ SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -351,8 +361,8 @@ public class TestSplitLogWorker {
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
@@ -394,9 +404,8 @@ public class TestSplitLogWorker {
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 6e0995c..af15d78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -390,6 +390,7 @@ public class TestSplitTransactionOnCluster {
AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
+ cluster.startRegionServer();
t.close();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
index 5b68f9f..5d304a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -111,8 +112,10 @@ public class TestHLogMethods {
@Test
public void testEntrySink() throws Exception {
Configuration conf = new Configuration();
+ RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
HLogSplitter splitter = new HLogSplitter(
- conf, mock(Path.class), mock(FileSystem.class), null, null);
+ conf, mock(Path.class), mock(FileSystem.class), null, null, mode);
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
for (int i = 0; i < 1000; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index 57a2549..8a6f544 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
@@ -127,6 +128,7 @@ public class TestHLogSplit {
private static String ROBBER;
private static String ZOMBIE;
private static String [] GROUP = new String [] {"supergroup"};
+ private RecoveryMode mode;
static enum Corruptions {
INSERT_GARBAGE_ON_FIRST_LINE,
@@ -177,6 +179,8 @@ public class TestHLogSplit {
REGIONS.clear();
Collections.addAll(REGIONS, "bbb", "ccc");
InstrumentedSequenceFileLogWriter.activateFailure = false;
+ this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@@ -805,7 +809,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
// Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter(
- conf, HBASEDIR, fs, null, null) {
+ conf, HBASEDIR, fs, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs,
Path logfile, Configuration conf) throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@@ -938,7 +942,7 @@ public class TestHLogSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = HLogSplitter.splitLogFile(
- HBASEDIR, logfile, spiedFs, conf, localReporter, null, null);
+ HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@@ -997,7 +1001,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter(
- localConf, HBASEDIR, fs, null, null) {
+ localConf, HBASEDIR, fs, null, null, this.mode) {
/* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1282,7 +1286,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
HLogSplitter logSplitter = new HLogSplitter(
- conf, HBASEDIR, fs, null, null) {
+ conf, HBASEDIR, fs, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index c5f5a2f..5b51eca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
@@ -98,6 +99,8 @@ public class TestWALReplay {
private Path logDir;
private FileSystem fs;
private Configuration conf;
+ private RecoveryMode mode;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -128,6 +131,8 @@ public class TestWALReplay {
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
+ this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
}
@After
@@ -875,7 +880,7 @@ public class TestWALReplay {
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
- this.fs, this.conf, null, null, null);
+ this.fs, this.conf, null, null, null, mode);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
new Path(hri.getEncodedName(), "recovered.edits")));
[2/2] git commit: HBASE-11094: Distributed log replay is incompatible
for rolling restarts
Posted by je...@apache.org.
HBASE-11094: Distributed log replay is incompatible for rolling restarts
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34ae4a94
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34ae4a94
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34ae4a94
Branch: refs/heads/0.98
Commit: 34ae4a94d0795850642915f117b7d1257b418ca5
Parents: 9ce1751
Author: Jeffrey Zhong <je...@apache.org>
Authored: Sun Jun 15 11:44:17 2014 -0700
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Sun Jun 15 11:44:17 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/protobuf/ProtobufUtil.java | 2 +-
.../hadoop/hbase/protobuf/RequestConverter.java | 21 +-
.../hbase/protobuf/generated/AdminProtos.java | 265 ++++++++++++++-----
.../protobuf/generated/ZooKeeperProtos.java | 239 +++++++++++++++--
hbase-protocol/src/main/protobuf/Admin.proto | 2 +
.../src/main/protobuf/ZooKeeper.proto | 6 +
.../org/apache/hadoop/hbase/SplitLogTask.java | 40 ++-
.../org/apache/hadoop/hbase/master/HMaster.java | 21 +-
.../hadoop/hbase/master/MasterFileSystem.java | 30 ++-
.../hadoop/hbase/master/ServerManager.java | 10 +-
.../hadoop/hbase/master/SplitLogManager.java | 156 +++++++++--
.../handler/MetaServerShutdownHandler.java | 8 +-
.../master/handler/ServerShutdownHandler.java | 13 +-
.../hbase/regionserver/HRegionServer.java | 26 +-
.../hbase/regionserver/SplitLogWorker.java | 36 ++-
.../handler/HLogSplitterHandler.java | 13 +-
.../hbase/regionserver/wal/HLogSplitter.java | 22 +-
.../apache/hadoop/hbase/TestSerialization.java | 4 +-
.../hadoop/hbase/client/TestMultiParallel.java | 4 +-
.../hbase/master/TestAssignmentManager.java | 5 +
.../master/TestDistributedLogSplitting.java | 3 +-
.../hbase/master/TestMasterFileSystem.java | 4 +-
.../hbase/master/TestSplitLogManager.java | 57 +++-
.../regionserver/TestRegionServerNoMaster.java | 12 +-
.../hbase/regionserver/TestSplitLogWorker.java | 39 +--
.../TestSplitTransactionOnCluster.java | 1 +
.../hbase/regionserver/wal/TestHLogMethods.java | 5 +-
.../hbase/regionserver/wal/TestHLogSplit.java | 12 +-
.../hbase/regionserver/wal/TestWALReplay.java | 7 +-
29 files changed, 801 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 0f393d9..2a0b912 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1670,7 +1670,7 @@ public final class ProtobufUtil {
public static void openRegion(final AdminService.BlockingInterface admin,
ServerName server, final HRegionInfo region) throws IOException {
OpenRegionRequest request =
- RequestConverter.buildOpenRegionRequest(server, region, -1, null);
+ RequestConverter.buildOpenRegionRequest(server, region, -1, null, null);
try {
admin.openRegion(null, request);
} catch (ServiceException se) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 685fd47..1a6b42d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.List;
import com.google.protobuf.HBaseZeroCopyByteString;
+
+import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -701,17 +703,18 @@ public final class RequestConverter {
* Create a protocol buffer OpenRegionRequest to open a list of regions
*
* @param regionOpenInfos info of a list of regions to open
+ * @param openForReplay
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest
buildOpenRegionRequest(final List<Triple<HRegionInfo, Integer,
- List<ServerName>>> regionOpenInfos) {
+ List<ServerName>>> regionOpenInfos, Boolean openForReplay) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
for (Triple<HRegionInfo, Integer, List<ServerName>> regionOpenInfo: regionOpenInfos) {
Integer second = regionOpenInfo.getSecond();
int versionOfOfflineNode = second == null ? -1 : second.intValue();
- builder.addOpenInfo(buildRegionOpenInfo(
- regionOpenInfo.getFirst(), versionOfOfflineNode, regionOpenInfo.getThird()));
+ builder.addOpenInfo(buildRegionOpenInfo(regionOpenInfo.getFirst(), versionOfOfflineNode,
+ regionOpenInfo.getThird(), openForReplay));
}
return builder.build();
}
@@ -723,12 +726,15 @@ public final class RequestConverter {
* @param region the region to open
* @param versionOfOfflineNode that needs to be present in the offline node
* @param favoredNodes
+ * @param openForReplay
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
- final HRegionInfo region, final int versionOfOfflineNode, List<ServerName> favoredNodes) {
+ final HRegionInfo region, final int versionOfOfflineNode, List<ServerName> favoredNodes,
+ Boolean openForReplay) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
- builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes));
+ builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode, favoredNodes,
+ openForReplay));
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
@@ -1477,7 +1483,7 @@ public final class RequestConverter {
*/
private static RegionOpenInfo buildRegionOpenInfo(
final HRegionInfo region, final int versionOfOfflineNode,
- final List<ServerName> favoredNodes) {
+ final List<ServerName> favoredNodes, Boolean openForReplay) {
RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
builder.setRegion(HRegionInfo.convert(region));
if (versionOfOfflineNode >= 0) {
@@ -1488,6 +1494,9 @@ public final class RequestConverter {
builder.addFavoredNodes(ProtobufUtil.toServerName(server));
}
}
+ if(openForReplay != null) {
+ builder.setOpenForDistributedLogReplay(openForReplay);
+ }
return builder.build();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index 0ad10ad..636e51f 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -4032,6 +4032,24 @@ public final class AdminProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getFavoredNodesOrBuilder(
int index);
+
+ // optional bool openForDistributedLogReplay = 4;
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ boolean hasOpenForDistributedLogReplay();
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ boolean getOpenForDistributedLogReplay();
}
/**
* Protobuf type {@code OpenRegionRequest.RegionOpenInfo}
@@ -4110,6 +4128,11 @@ public final class AdminProtos {
favoredNodes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.PARSER, extensionRegistry));
break;
}
+ case 32: {
+ bitField0_ |= 0x00000004;
+ openForDistributedLogReplay_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4227,10 +4250,35 @@ public final class AdminProtos {
return favoredNodes_.get(index);
}
+ // optional bool openForDistributedLogReplay = 4;
+ public static final int OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER = 4;
+ private boolean openForDistributedLogReplay_;
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ public boolean hasOpenForDistributedLogReplay() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ public boolean getOpenForDistributedLogReplay() {
+ return openForDistributedLogReplay_;
+ }
+
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo.getDefaultInstance();
versionOfOfflineNode_ = 0;
favoredNodes_ = java.util.Collections.emptyList();
+ openForDistributedLogReplay_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4267,6 +4315,9 @@ public final class AdminProtos {
for (int i = 0; i < favoredNodes_.size(); i++) {
output.writeMessage(3, favoredNodes_.get(i));
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBool(4, openForDistributedLogReplay_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4288,6 +4339,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, favoredNodes_.get(i));
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(4, openForDistributedLogReplay_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4323,6 +4378,11 @@ public final class AdminProtos {
}
result = result && getFavoredNodesList()
.equals(other.getFavoredNodesList());
+ result = result && (hasOpenForDistributedLogReplay() == other.hasOpenForDistributedLogReplay());
+ if (hasOpenForDistributedLogReplay()) {
+ result = result && (getOpenForDistributedLogReplay()
+ == other.getOpenForDistributedLogReplay());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -4348,6 +4408,10 @@ public final class AdminProtos {
hash = (37 * hash) + FAVORED_NODES_FIELD_NUMBER;
hash = (53 * hash) + getFavoredNodesList().hashCode();
}
+ if (hasOpenForDistributedLogReplay()) {
+ hash = (37 * hash) + OPENFORDISTRIBUTEDLOGREPLAY_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getOpenForDistributedLogReplay());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -4473,6 +4537,8 @@ public final class AdminProtos {
} else {
favoredNodesBuilder_.clear();
}
+ openForDistributedLogReplay_ = false;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -4522,6 +4588,10 @@ public final class AdminProtos {
} else {
result.favoredNodes_ = favoredNodesBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.openForDistributedLogReplay_ = openForDistributedLogReplay_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4570,6 +4640,9 @@ public final class AdminProtos {
}
}
}
+ if (other.hasOpenForDistributedLogReplay()) {
+ setOpenForDistributedLogReplay(other.getOpenForDistributedLogReplay());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -5001,6 +5074,55 @@ public final class AdminProtos {
return favoredNodesBuilder_;
}
+ // optional bool openForDistributedLogReplay = 4;
+ private boolean openForDistributedLogReplay_ ;
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ public boolean hasOpenForDistributedLogReplay() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ public boolean getOpenForDistributedLogReplay() {
+ return openForDistributedLogReplay_;
+ }
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ public Builder setOpenForDistributedLogReplay(boolean value) {
+ bitField0_ |= 0x00000008;
+ openForDistributedLogReplay_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool openForDistributedLogReplay = 4;</code>
+ *
+ * <pre>
+ * open region for distributedLogReplay
+ * </pre>
+ */
+ public Builder clearOpenForDistributedLogReplay() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ openForDistributedLogReplay_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:OpenRegionRequest.RegionOpenInfo)
}
@@ -21166,77 +21288,78 @@ public final class AdminProtos {
"FileResponse\022\022\n\nstore_file\030\001 \003(\t\"\030\n\026GetO" +
"nlineRegionRequest\";\n\027GetOnlineRegionRes" +
"ponse\022 \n\013region_info\030\001 \003(\0132\013.RegionInfo\"" +
- "\326\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
+ "\374\001\n\021OpenRegionRequest\0224\n\topen_info\030\001 \003(\013" +
"2!.OpenRegionRequest.RegionOpenInfo\022\027\n\017s" +
- "erverStartCode\030\002 \001(\004\032r\n\016RegionOpenInfo\022\033" +
- "\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_o" +
- "f_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003 " +
- "\003(\0132\013.ServerName\"\235\001\n\022OpenRegionResponse\022",
- "=\n\ropening_state\030\001 \003(\0162&.OpenRegionRespo" +
- "nse.RegionOpeningState\"H\n\022RegionOpeningS" +
- "tate\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016" +
- "FAILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022" +
- " \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027ver" +
- "sion_of_closing_node\030\002 \001(\r\022\036\n\020transition" +
- "_in_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server" +
- "\030\004 \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005" +
- " \001(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 " +
- "\002(\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(",
- "\0132\020.RegionSpecifier\022\030\n\020if_older_than_ts\030" +
- "\002 \001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flu" +
- "sh_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitR" +
- "egionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
- "cifier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegi" +
- "onResponse\"W\n\024CompactRegionRequest\022 \n\006re" +
- "gion\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 " +
- "\001(\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionResp" +
- "onse\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013up" +
- "date_info\030\001 \003(\0132+.UpdateFavoredNodesRequ",
- "est.RegionUpdateInfo\032S\n\020RegionUpdateInfo" +
- "\022\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored" +
- "_nodes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavor" +
- "edNodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Mer" +
- "geRegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Reg" +
- "ionSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionS" +
- "pecifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Mer" +
- "geRegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002" +
- "(\0132\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025" +
- "associated_cell_count\030\003 \001(\005\"4\n\030Replicate",
- "WALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntr" +
- "y\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWA" +
- "LWriterRequest\"0\n\025RollWALWriterResponse\022" +
- "\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRe" +
- "quest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespo" +
- "nse\"\026\n\024GetServerInfoRequest\"B\n\nServerInf" +
- "o\022 \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nw" +
- "ebui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse" +
- "\022 \n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014A" +
- "dminService\022>\n\rGetRegionInfo\022\025.GetRegion",
- "InfoRequest\032\026.GetRegionInfoResponse\022;\n\014G" +
- "etStoreFile\022\024.GetStoreFileRequest\032\025.GetS" +
- "toreFileResponse\022D\n\017GetOnlineRegion\022\027.Ge" +
- "tOnlineRegionRequest\032\030.GetOnlineRegionRe" +
- "sponse\0225\n\nOpenRegion\022\022.OpenRegionRequest" +
- "\032\023.OpenRegionResponse\0228\n\013CloseRegion\022\023.C" +
- "loseRegionRequest\032\024.CloseRegionResponse\022" +
- "8\n\013FlushRegion\022\023.FlushRegionRequest\032\024.Fl" +
- "ushRegionResponse\0228\n\013SplitRegion\022\023.Split" +
- "RegionRequest\032\024.SplitRegionResponse\022>\n\rC",
- "ompactRegion\022\025.CompactRegionRequest\032\026.Co" +
- "mpactRegionResponse\022;\n\014MergeRegions\022\024.Me" +
- "rgeRegionsRequest\032\025.MergeRegionsResponse" +
- "\022J\n\021ReplicateWALEntry\022\031.ReplicateWALEntr" +
- "yRequest\032\032.ReplicateWALEntryResponse\022?\n\006" +
- "Replay\022\031.ReplicateWALEntryRequest\032\032.Repl" +
- "icateWALEntryResponse\022>\n\rRollWALWriter\022\025" +
- ".RollWALWriterRequest\032\026.RollWALWriterRes" +
- "ponse\022>\n\rGetServerInfo\022\025.GetServerInfoRe" +
- "quest\032\026.GetServerInfoResponse\0225\n\nStopSer",
- "ver\022\022.StopServerRequest\032\023.StopServerResp" +
- "onse\022M\n\022UpdateFavoredNodes\022\032.UpdateFavor" +
- "edNodesRequest\032\033.UpdateFavoredNodesRespo" +
- "nseBA\n*org.apache.hadoop.hbase.protobuf." +
- "generatedB\013AdminProtosH\001\210\001\001\240\001\001"
+ "erverStartCode\030\002 \001(\004\032\227\001\n\016RegionOpenInfo\022" +
+ "\033\n\006region\030\001 \002(\0132\013.RegionInfo\022\037\n\027version_" +
+ "of_offline_node\030\002 \001(\r\022\"\n\rfavored_nodes\030\003" +
+ " \003(\0132\013.ServerName\022#\n\033openForDistributedL",
+ "ogReplay\030\004 \001(\010\"\235\001\n\022OpenRegionResponse\022=\n" +
+ "\ropening_state\030\001 \003(\0162&.OpenRegionRespons" +
+ "e.RegionOpeningState\"H\n\022RegionOpeningSta" +
+ "te\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENED\020\001\022\022\n\016FA" +
+ "ILED_OPENING\020\002\"\271\001\n\022CloseRegionRequest\022 \n" +
+ "\006region\030\001 \002(\0132\020.RegionSpecifier\022\037\n\027versi" +
+ "on_of_closing_node\030\002 \001(\r\022\036\n\020transition_i" +
+ "n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" +
+ " \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" +
+ "(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(",
+ "\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
+ "\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " +
+ "\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" +
+ "_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" +
+ "ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" +
+ "fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" +
+ "Response\"W\n\024CompactRegionRequest\022 \n\006regi" +
+ "on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" +
+ "\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" +
+ "se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda",
+ "te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" +
+ "t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" +
+ "\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" +
+ "odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" +
+ "NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" +
+ "RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" +
+ "nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" +
+ "cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" +
+ "RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" +
+ "2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
+ "sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" +
+ "LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" +
+ "\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" +
+ "riterRequest\"0\n\025RollWALWriterResponse\022\027\n" +
+ "\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" +
+ "est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" +
+ "e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" +
+ " \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" +
+ "ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " +
+ "\n\013server_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014Adm",
+ "inService\022>\n\rGetRegionInfo\022\025.GetRegionIn" +
+ "foRequest\032\026.GetRegionInfoResponse\022;\n\014Get" +
+ "StoreFile\022\024.GetStoreFileRequest\032\025.GetSto" +
+ "reFileResponse\022D\n\017GetOnlineRegion\022\027.GetO" +
+ "nlineRegionRequest\032\030.GetOnlineRegionResp" +
+ "onse\0225\n\nOpenRegion\022\022.OpenRegionRequest\032\023" +
+ ".OpenRegionResponse\0228\n\013CloseRegion\022\023.Clo" +
+ "seRegionRequest\032\024.CloseRegionResponse\0228\n" +
+ "\013FlushRegion\022\023.FlushRegionRequest\032\024.Flus" +
+ "hRegionResponse\0228\n\013SplitRegion\022\023.SplitRe",
+ "gionRequest\032\024.SplitRegionResponse\022>\n\rCom" +
+ "pactRegion\022\025.CompactRegionRequest\032\026.Comp" +
+ "actRegionResponse\022;\n\014MergeRegions\022\024.Merg" +
+ "eRegionsRequest\032\025.MergeRegionsResponse\022J" +
+ "\n\021ReplicateWALEntry\022\031.ReplicateWALEntryR" +
+ "equest\032\032.ReplicateWALEntryResponse\022?\n\006Re" +
+ "play\022\031.ReplicateWALEntryRequest\032\032.Replic" +
+ "ateWALEntryResponse\022>\n\rRollWALWriter\022\025.R" +
+ "ollWALWriterRequest\032\026.RollWALWriterRespo" +
+ "nse\022>\n\rGetServerInfo\022\025.GetServerInfoRequ",
+ "est\032\026.GetServerInfoResponse\0225\n\nStopServe" +
+ "r\022\022.StopServerRequest\032\023.StopServerRespon" +
+ "se\022M\n\022UpdateFavoredNodes\022\032.UpdateFavored" +
+ "NodesRequest\032\033.UpdateFavoredNodesRespons" +
+ "eBA\n*org.apache.hadoop.hbase.protobuf.ge" +
+ "neratedB\013AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21290,7 +21413,7 @@ public final class AdminProtos {
internal_static_OpenRegionRequest_RegionOpenInfo_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_OpenRegionRequest_RegionOpenInfo_descriptor,
- new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", });
+ new java.lang.String[] { "Region", "VersionOfOfflineNode", "FavoredNodes", "OpenForDistributedLogReplay", });
internal_static_OpenRegionResponse_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_OpenRegionResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 0af2a97..9d037f5 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -3230,6 +3230,16 @@ public final class ZooKeeperProtos {
* <code>required .ServerName server_name = 2;</code>
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder();
+
+ // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ boolean hasMode();
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode();
}
/**
* Protobuf type {@code SplitLogTask}
@@ -3312,6 +3322,17 @@ public final class ZooKeeperProtos {
bitField0_ |= 0x00000002;
break;
}
+ case 24: {
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(3, rawValue);
+ } else {
+ bitField0_ |= 0x00000004;
+ mode_ = value;
+ }
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3460,6 +3481,97 @@ public final class ZooKeeperProtos {
// @@protoc_insertion_point(enum_scope:SplitLogTask.State)
}
+ /**
+ * Protobuf enum {@code SplitLogTask.RecoveryMode}
+ */
+ public enum RecoveryMode
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * <code>UNKNOWN = 0;</code>
+ */
+ UNKNOWN(0, 0),
+ /**
+ * <code>LOG_SPLITTING = 1;</code>
+ */
+ LOG_SPLITTING(1, 1),
+ /**
+ * <code>LOG_REPLAY = 2;</code>
+ */
+ LOG_REPLAY(2, 2),
+ ;
+
+ /**
+ * <code>UNKNOWN = 0;</code>
+ */
+ public static final int UNKNOWN_VALUE = 0;
+ /**
+ * <code>LOG_SPLITTING = 1;</code>
+ */
+ public static final int LOG_SPLITTING_VALUE = 1;
+ /**
+ * <code>LOG_REPLAY = 2;</code>
+ */
+ public static final int LOG_REPLAY_VALUE = 2;
+
+
+ public final int getNumber() { return value; }
+
+ public static RecoveryMode valueOf(int value) {
+ switch (value) {
+ case 0: return UNKNOWN;
+ case 1: return LOG_SPLITTING;
+ case 2: return LOG_REPLAY;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<RecoveryMode>() {
+ public RecoveryMode findValueByNumber(int number) {
+ return RecoveryMode.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.getDescriptor().getEnumTypes().get(1);
+ }
+
+ private static final RecoveryMode[] VALUES = values();
+
+ public static RecoveryMode valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private RecoveryMode(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:SplitLogTask.RecoveryMode)
+ }
+
private int bitField0_;
// required .SplitLogTask.State state = 1;
public static final int STATE_FIELD_NUMBER = 1;
@@ -3499,9 +3611,26 @@ public final class ZooKeeperProtos {
return serverName_;
}
+ // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
+ public static final int MODE_FIELD_NUMBER = 3;
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_;
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ public boolean hasMode() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
+ return mode_;
+ }
+
private void initFields() {
state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3533,6 +3662,9 @@ public final class ZooKeeperProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, serverName_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeEnum(3, mode_.getNumber());
+ }
getUnknownFields().writeTo(output);
}
@@ -3550,6 +3682,10 @@ public final class ZooKeeperProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, serverName_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(3, mode_.getNumber());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3583,6 +3719,11 @@ public final class ZooKeeperProtos {
result = result && getServerName()
.equals(other.getServerName());
}
+ result = result && (hasMode() == other.hasMode());
+ if (hasMode()) {
+ result = result &&
+ (getMode() == other.getMode());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -3604,6 +3745,10 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + SERVER_NAME_FIELD_NUMBER;
hash = (53 * hash) + getServerName().hashCode();
}
+ if (hasMode()) {
+ hash = (37 * hash) + MODE_FIELD_NUMBER;
+ hash = (53 * hash) + hashEnum(getMode());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -3728,6 +3873,8 @@ public final class ZooKeeperProtos {
serverNameBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
+ mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -3768,6 +3915,10 @@ public final class ZooKeeperProtos {
} else {
result.serverName_ = serverNameBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.mode_ = mode_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3790,6 +3941,9 @@ public final class ZooKeeperProtos {
if (other.hasServerName()) {
mergeServerName(other.getServerName());
}
+ if (other.hasMode()) {
+ setMode(other.getMode());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3982,6 +4136,42 @@ public final class ZooKeeperProtos {
return serverNameBuilder_;
}
+ // optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ public boolean hasMode() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
+ return mode_;
+ }
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ public Builder setMode(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ mode_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional .SplitLogTask.RecoveryMode mode = 3 [default = UNKNOWN];</code>
+ */
+ public Builder clearMode() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ mode_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:SplitLogTask)
}
@@ -9399,29 +9589,32 @@ public final class ZooKeeperProtos {
"gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" +
"\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" +
"\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" +
- "ayload\030\005 \001(\014\"\231\001\n\014SplitLogTask\022\"\n\005state\030\001" +
+ "ayload\030\005 \001(\014\"\214\002\n\014SplitLogTask\022\"\n\005state\030\001" +
" \002(\0162\023.SplitLogTask.State\022 \n\013server_name",
- "\030\002 \002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGN" +
- "ED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022" +
- "\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table" +
- ".State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n" +
- "\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003" +
- "\"%\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"" +
- "^\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Rep" +
- "licationState.State\"\"\n\005State\022\013\n\007ENABLED\020" +
- "\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositi" +
- "on\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022",
- "\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntab" +
- "le_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030" +
- "\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n" +
- "\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013cre" +
- "ate_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fam" +
- "ily_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026R" +
- "egionStoreSequenceIds\022 \n\030last_flushed_se" +
- "quence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003" +
- "(\0132\020.StoreSequenceIdBE\n*org.apache.hadoo" +
- "p.hbase.protobuf.generatedB\017ZooKeeperPro",
- "tosH\001\210\001\001\240\001\001"
+ "\030\002 \002(\0132\013.ServerName\0221\n\004mode\030\003 \001(\0162\032.Spli" +
+ "tLogTask.RecoveryMode:\007UNKNOWN\"C\n\005State\022" +
+ "\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002" +
+ "\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014RecoveryMode\022\013\n\007U" +
+ "NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
+ "Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
+ "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
+ "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" +
+ "eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" +
+ "plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat",
+ "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
+ "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
+ "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" +
+ "ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" +
+ "me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" +
+ "2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" +
+ "hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" +
+ "ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" +
+ "ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" +
+ "StoreSequenceIds\022 \n\030last_flushed_sequenc",
+ "e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." +
+ "StoreSequenceIdBE\n*org.apache.hadoop.hba" +
+ "se.protobuf.generatedB\017ZooKeeperProtosH\001" +
+ "\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9457,7 +9650,7 @@ public final class ZooKeeperProtos {
internal_static_SplitLogTask_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SplitLogTask_descriptor,
- new java.lang.String[] { "State", "ServerName", });
+ new java.lang.String[] { "State", "ServerName", "Mode", });
internal_static_Table_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_Table_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index 5b889cd..ecf30f4 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -75,6 +75,8 @@ message OpenRegionRequest {
required RegionInfo region = 1;
optional uint32 version_of_offline_node = 2;
repeated ServerName favored_nodes = 3;
+ // open region for distributedLogReplay
+ optional bool openForDistributedLogReplay = 4;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 082e1f7..37816da 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -85,8 +85,14 @@ message SplitLogTask {
DONE = 3;
ERR = 4;
}
+ enum RecoveryMode {
+ UNKNOWN = 0;
+ LOG_SPLITTING = 1;
+ LOG_REPLAY = 2;
+ }
required State state = 1;
required ServerName server_name = 2;
+ optional RecoveryMode mode = 3 [default = UNKNOWN];
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
index 67a0994..1b5f8b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -36,49 +39,59 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class SplitLogTask {
private final ServerName originServer;
private final ZooKeeperProtos.SplitLogTask.State state;
+ private final ZooKeeperProtos.SplitLogTask.RecoveryMode mode;
public static class Unassigned extends SplitLogTask {
- public Unassigned(final ServerName originServer) {
- super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED);
+ public Unassigned(final ServerName originServer, final RecoveryMode mode) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, mode);
}
}
public static class Owned extends SplitLogTask {
- public Owned(final ServerName originServer) {
- super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED);
+ public Owned(final ServerName originServer, final RecoveryMode mode) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.OWNED, mode);
}
}
public static class Resigned extends SplitLogTask {
- public Resigned(final ServerName originServer) {
- super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED);
+ public Resigned(final ServerName originServer, final RecoveryMode mode) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.RESIGNED, mode);
}
}
public static class Done extends SplitLogTask {
- public Done(final ServerName originServer) {
- super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE);
+ public Done(final ServerName originServer, final RecoveryMode mode) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.DONE, mode);
}
}
public static class Err extends SplitLogTask {
- public Err(final ServerName originServer) {
- super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR);
+ public Err(final ServerName originServer, final RecoveryMode mode) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.ERR, mode);
}
}
SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) {
- this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState());
+ this.originServer = ProtobufUtil.toServerName(slt.getServerName());
+ this.state = slt.getState();
+ this.mode = (slt.hasMode()) ? slt.getMode() :
+ ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
}
- SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) {
+ SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state,
+ final ZooKeeperProtos.SplitLogTask.RecoveryMode mode) {
this.originServer = originServer;
this.state = state;
+ this.mode = mode;
}
public ServerName getServerName() {
return this.originServer;
}
+
+ public ZooKeeperProtos.SplitLogTask.RecoveryMode getMode() {
+ return this.mode;
+ }
public boolean isUnassigned(final ServerName sn) {
return this.originServer.equals(sn) && isUnassigned();
@@ -167,7 +180,8 @@ public class SplitLogTask {
// pbs just created.
HBaseProtos.ServerName snpb = ProtobufUtil.toServerName(this.originServer);
ZooKeeperProtos.SplitLogTask slts =
- ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).build();
+ ZooKeeperProtos.SplitLogTask.newBuilder().setServerName(snpb).setState(this.state).
+ setMode(this.mode).build();
return ProtobufUtil.prependPBMagic(slts.toByteArray());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 83a3ece..3d36915 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -111,15 +111,12 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.*;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
@@ -200,7 +197,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequ
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -209,6 +205,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -391,12 +388,6 @@ MasterServices, Server {
/** The health check chore. */
private HealthCheckChore healthCheckChore;
- /**
- * is in distributedLogReplay mode. When true, SplitLogWorker directly replays WAL edits to newly
- * assigned region servers instead of creating recovered.edits files.
- */
- private final boolean distributedLogReplay;
-
/** flag used in test cases in order to simulate RS failures during master initialization */
private volatile boolean initializationBeforeMetaAssignment = false;
@@ -516,9 +507,6 @@ MasterServices, Server {
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
}
}
-
- distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
- HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
}
/**
@@ -1039,7 +1027,8 @@ MasterServices, Server {
enableMeta(TableName.META_TABLE_NAME);
- if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) {
+ if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
+ && (!previouslyFailedMetaRSs.isEmpty())) {
// replay WAL edits mode need new hbase:meta RS is assigned firstly
status.setStatus("replaying log for Meta Region");
this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
@@ -1063,7 +1052,7 @@ MasterServices, Server {
}
private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
- if (this.distributedLogReplay) {
+ if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
// In log replay mode, we mark hbase:meta region as recovering in ZK
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 687fc75..2b2a5f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
@@ -122,14 +123,17 @@ public class MasterFileSystem {
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
- this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
// setup the filesystem variable
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf);
- this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
- master.getConfiguration(), master, services,
- master.getServerName(), masterRecovery);
+ try {
+ this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(),
+ master, services, master.getServerName());
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
}
/**
@@ -682,4 +686,22 @@ public class MasterFileSystem {
}
return null;
}
+
+ /**
+ * The function is used in SSH to set recovery mode based on configuration after all outstanding
+ * log split tasks drained.
+ * @throws KeeperException
+ * @throws InterruptedIOException
+ */
+ public void setLogRecoveryMode() throws IOException {
+ try {
+ this.splitLogManager.setRecoveryMode(false);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public RecoveryMode getLogRecoveryMode() {
+ return this.splitLogManager.getRecoveryMode();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index e7b8532..4e4ad15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Triple;
@@ -662,8 +663,9 @@ public class ServerManager {
" failed because no RPC connection found to this server");
return RegionOpeningState.FAILED_OPENING;
}
- OpenRegionRequest request =
- RequestConverter.buildOpenRegionRequest(server, region, versionOfOfflineNode, favoredNodes);
+ OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
+ region, versionOfOfflineNode, favoredNodes,
+ (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningState(response);
@@ -691,8 +693,8 @@ public class ServerManager {
return null;
}
- OpenRegionRequest request =
- RequestConverter.buildOpenRegionRequest(regionOpenInfos);
+ OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(regionOpenInfos,
+ (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningStateList(response);
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index bdf60dd..2e98433 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.I
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -45,16 +46,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
@@ -137,7 +141,8 @@ public class SplitLogManager extends ZooKeeperListener {
*/
protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
- final boolean distributedLogReplay;
+ private volatile RecoveryMode recoveryMode;
+ private volatile boolean isDrainingDone = false;
private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
private TimeoutMonitor timeoutMonitor;
@@ -158,9 +163,12 @@ public class SplitLogManager extends ZooKeeperListener {
* @param stopper the stoppable in case anything is wrong
* @param master the master services
* @param serverName the master server name
+ * @throws KeeperException
+ * @throws InterruptedIOException
*/
- public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
- Stoppable stopper, MasterServices master, ServerName serverName) {
+ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
+ Stoppable stopper, MasterServices master, ServerName serverName)
+ throws InterruptedIOException, KeeperException {
this(zkw, conf, stopper, master, serverName, false, null);
}
@@ -178,9 +186,12 @@ public class SplitLogManager extends ZooKeeperListener {
* @param master the master services
* @param serverName the master server name
* @param masterRecovery an indication if the master is in recovery
+ * @throws KeeperException
+ * @throws InterruptedIOException
*/
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
- Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) {
+ Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery)
+ throws InterruptedIOException, KeeperException {
this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
@Override
public Status finish(ServerName workerName, String logfile) {
@@ -207,10 +218,13 @@ public class SplitLogManager extends ZooKeeperListener {
* @param serverName the master server name
* @param masterRecovery an indication if the master is in recovery
* @param tf task finisher
+ * @throws KeeperException
+ * @throws InterruptedIOException
*/
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
Stoppable stopper, MasterServices master,
- ServerName serverName, boolean masterRecovery, TaskFinisher tf) {
+ ServerName serverName, boolean masterRecovery, TaskFinisher tf)
+ throws InterruptedIOException, KeeperException {
super(zkw);
this.taskFinisher = tf;
this.conf = conf;
@@ -221,9 +235,12 @@ public class SplitLogManager extends ZooKeeperListener {
this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
this.unassignedTimeout =
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
- this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
+
+ // Determine recovery mode
+ setRecoveryMode(true);
+
LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
- ", distributedLogReplay=" + this.distributedLogReplay);
+ ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
this.serverName = serverName;
this.timeoutMonitor = new TimeoutMonitor(
@@ -486,8 +503,7 @@ public class SplitLogManager extends ZooKeeperListener {
*/
private void
removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
-
- if (!this.distributedLogReplay) {
+ if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
// the function is only used in WALEdit direct replay mode
return;
}
@@ -515,7 +531,7 @@ public class SplitLogManager extends ZooKeeperListener {
if (count == 0 && this.master.isInitialized()
&& !this.master.getServerManager().areDeadServersInProgress()) {
// no splitting work items left
- deleteRecoveringRegionZNodes(null);
+ deleteRecoveringRegionZNodes(watcher, null);
// reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
// this point.
lastRecoveringNodeCreationTime = Long.MAX_VALUE;
@@ -571,14 +587,6 @@ public class SplitLogManager extends ZooKeeperListener {
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
throws KeeperException {
- if (!this.distributedLogReplay) {
- // remove any regions in recovery from ZK which could happen when we turn the feature on
- // and later turn it off
- ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
- // the function is only used in distributedLogReplay mode when master is in initialization
- return;
- }
-
Set<String> knownFailedServers = new HashSet<String>();
if (failedServers != null) {
for (ServerName tmpServerName : failedServers) {
@@ -641,7 +649,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
- private void deleteRecoveringRegionZNodes(List<String> regions) {
+ public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
try {
if (regions == null) {
// remove all children under /home/recovering-regions
@@ -699,7 +707,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void createNode(String path, Long retry_count) {
- SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
+ SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
return;
@@ -874,7 +882,7 @@ public class SplitLogManager extends ZooKeeperListener {
task.incarnation++;
try {
// blocking zk call but this is done from the timeout thread
- SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
+ SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
LOG.debug("failed to resubmit task " + path +
" version changed");
@@ -967,7 +975,7 @@ public class SplitLogManager extends ZooKeeperListener {
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
- SplitLogTask slt = new SplitLogTask.Done(this.serverName);
+ SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
this.watcher.getRecoverableZooKeeper().getZooKeeper().
create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
@@ -1115,7 +1123,7 @@ public class SplitLogManager extends ZooKeeperListener {
*/
void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
throws KeeperException {
- if (userRegions == null || !this.distributedLogReplay) {
+ if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
return;
}
@@ -1251,6 +1259,108 @@ public class SplitLogManager extends ZooKeeperListener {
}
return result;
}
+
+ /**
+ * This function is to set recovery mode from outstanding split log tasks from before or
+ * current configuration setting
+ * @param isForInitialization
+ * @throws KeeperException
+ * @throws InterruptedIOException
+ */
+ public void setRecoveryMode(boolean isForInitialization) throws KeeperException {
+ if(this.isDrainingDone) {
+ // when there is no outstanding splitlogtask after master start up, we already have up to date
+ // recovery mode
+ return;
+ }
+ if(this.watcher == null) {
+ // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
+ this.isDrainingDone = true;
+ this.recoveryMode = RecoveryMode.LOG_SPLITTING;
+ return;
+ }
+ boolean hasSplitLogTask = false;
+ boolean hasRecoveringRegions = false;
+ RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
+ RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
+
+ // Firstly check if there are outstanding recovering regions
+ List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
+ if (regions != null && !regions.isEmpty()) {
+ hasRecoveringRegions = true;
+ previousRecoveryMode = RecoveryMode.LOG_REPLAY;
+ }
+ if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+ // Secondly check if there are outstanding split log task
+ List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
+ if (tasks != null && !tasks.isEmpty()) {
+ hasSplitLogTask = true;
+ if (isForInitialization) {
+ // during initialization, try to get recovery mode from splitlogtask
+ for (String task : tasks) {
+ try {
+ byte[] data = ZKUtil.getData(this.watcher,
+ ZKUtil.joinZNode(watcher.splitLogZNode, task));
+ if (data == null) continue;
+ SplitLogTask slt = SplitLogTask.parseFrom(data);
+ previousRecoveryMode = slt.getMode();
+ if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+ // created by old code base where we don't set recovery mode in splitlogtask
+ // we can safely set to LOG_SPLITTING because we're in master initialization code
+ // before SSH is enabled & there is no outstanding recovering regions
+ previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
+ }
+ break;
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse data for znode " + task, e);
+ }
+ }
+ }
+ }
+ }
+
+ synchronized(this) {
+ if(this.isDrainingDone) {
+ return;
+ }
+ if (!hasSplitLogTask && !hasRecoveringRegions) {
+ this.isDrainingDone = true;
+ this.recoveryMode = recoveryModeInConfig;
+ return;
+ } else if (!isForInitialization) {
+ // splitlogtask hasn't drained yet, keep existing recovery mode
+ return;
+ }
+
+ if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
+ this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
+ this.recoveryMode = previousRecoveryMode;
+ } else {
+ this.recoveryMode = recoveryModeInConfig;
+ }
+ }
+ }
+
+ public RecoveryMode getRecoveryMode() {
+ return this.recoveryMode;
+ }
+
+ /**
+ * Returns if distributed log replay is turned on or not
+ * @param conf
+ * @return true when distributed log replay is turned on
+ */
+ private boolean isDistributedLogReplay(Configuration conf) {
+ boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+ HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+ int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
+ }
+ // For distributed log replay, hfile version must be 3 at least; we need tag support.
+ return dlr && (version >= 3);
+ }
/**
* Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
index 03acccf..ce58e90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
@@ -62,10 +63,13 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
boolean gotException = true;
try {
AssignmentManager am = this.services.getAssignmentManager();
+ this.services.getMasterFileSystem().setLogRecoveryMode();
+ boolean distributedLogReplay =
+ (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
if (this.shouldSplitHlog) {
LOG.info("Splitting hbase:meta logs for " + serverName);
- if (this.distributedLogReplay) {
+ if (distributedLogReplay) {
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
@@ -97,7 +101,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
}
try {
- if (this.shouldSplitHlog && this.distributedLogReplay) {
+ if (this.shouldSplitHlog && distributedLogReplay) {
if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index a08dc3c..8a9e9c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@@ -63,7 +64,6 @@ public class ServerShutdownHandler extends EventHandler {
protected final MasterServices services;
protected final DeadServer deadServers;
protected final boolean shouldSplitHlog; // whether to split HLog or not
- protected final boolean distributedLogReplay;
protected final int regionAssignmentWaitTimeout;
public ServerShutdownHandler(final Server server, final MasterServices services,
@@ -85,7 +85,6 @@ public class ServerShutdownHandler extends EventHandler {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
this.shouldSplitHlog = shouldSplitHlog;
- this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration());
this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
}
@@ -186,10 +185,16 @@ public class ServerShutdownHandler extends EventHandler {
throw new IOException("Server is stopped");
}
+ // delayed to set recovery mode based on configuration only after all outstanding splitlogtask
+ // drained
+ this.services.getMasterFileSystem().setLogRecoveryMode();
+ boolean distributedLogReplay =
+ (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
+
try {
if (this.shouldSplitHlog) {
LOG.info("Splitting logs for " + serverName + " before assignment.");
- if (this.distributedLogReplay) {
+ if (distributedLogReplay) {
LOG.info("Mark regions in recovery before assignment.");
Set<ServerName> serverNames = new HashSet<ServerName>();
serverNames.add(serverName);
@@ -288,7 +293,7 @@ public class ServerShutdownHandler extends EventHandler {
throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
}
- if (this.shouldSplitHlog && this.distributedLogReplay) {
+ if (this.shouldSplitHlog && distributedLogReplay) {
// wait for region assignment completes
for (HRegionInfo hri : toAssignRegions) {
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 569983e..1f3736d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -495,9 +495,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
private RegionServerProcedureManagerHost rspmHost;
- // configuration setting on if replay WAL edits directly to another RS
- private final boolean distributedLogReplay;
-
// Table level lock manager for locking for region operations
private TableLockManager tableLockManager;
@@ -630,7 +627,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// Put up the webui. Webui may come up on port other than configured if
// that port is occupied. Adjust serverInfo if this is the case.
this.rsInfo.setInfoPort(putUpWebUI());
- this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
}
/**
@@ -764,9 +760,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
// register watcher for recovering regions
- if(this.distributedLogReplay) {
- this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
- }
+ this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
}
/**
@@ -3670,10 +3664,20 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (previous == null) {
// check if the region to be opened is marked in recovering state in ZK
- if (this.distributedLogReplay
- && SplitLogManager.isRegionMarkedRecoveringInZK(this.getZooKeeper(),
- region.getEncodedName())) {
- this.recoveringRegions.put(region.getEncodedName(), null);
+ if (SplitLogManager.isRegionMarkedRecoveringInZK(this.zooKeeper,
+ region.getEncodedName())) {
+ // check if current region open is for distributedLogReplay. This check is to support
+ // rolling restart/upgrade where we want to Master/RS see same configuration
+ if (!regionOpenInfo.hasOpenForDistributedLogReplay()
+ || regionOpenInfo.getOpenForDistributedLogReplay()) {
+ this.recoveringRegions.put(region.getEncodedName(), null);
+ } else {
+ // remove stale recovery region from ZK when we open region not for recovering which
+ // could happen when turn distributedLogReplay off from on.
+ List<String> tmpRegions = new ArrayList<String>();
+ tmpRegions.add(region.getEncodedName());
+ SplitLogManager.deleteRecoveringRegionZNodes(this.zooKeeper, tmpRegions);
+ }
}
// If there is no action in progress, we can submit a specific handler.
// Need to pass the expected version in the constructor.
http://git-wip-us.apache.org/repos/asf/hbase/blob/34ae4a94/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 4a83741..c1b2cb7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
@@ -125,7 +126,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
RegionServerServices server, final LastSequenceId sequenceIdChecker) {
this(watcher, conf, server, new TaskExecutor() {
@Override
- public Status exec(String filename, CancelableProgressable p) {
+ public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
Path rootdir;
FileSystem fs;
try {
@@ -140,7 +141,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
- fs, conf, p, sequenceIdChecker, watcher)) {
+ fs, conf, p, sequenceIdChecker, watcher, mode)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@@ -174,11 +175,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
try {
LOG.info("SplitLogWorker " + this.serverName + " starting");
this.watcher.registerListener(this);
- boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
- if (distributedLogReplay) {
- // initialize a new connection for splitlogworker configuration
- HConnectionManager.getConnection(conf);
- }
+ // pre-initialize a new connection for splitlogworker configuration
+ HConnectionManager.getConnection(conf);
// wait for master to create the splitLogZnode
int res = -1;
@@ -313,7 +311,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
*/
private void grabTask(String path) {
Stat stat = new Stat();
- long t = -1;
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
@@ -346,14 +343,15 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
return;
}
- currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
+ currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(),
+ stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
- HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
+ HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
return;
}
@@ -362,7 +360,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
- submitTask(path, currentVersion, this.report_period);
+ submitTask(path, slt.getMode(), currentVersion, this.report_period);
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
try {
@@ -397,10 +395,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* @return non-negative integer value when task can be owned by current region server otherwise -1
*/
protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
- ServerName server, String task, int taskZKVersion) {
+ ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
int latestZKVersion = FAILED_TO_OWN_TASK;
try {
- SplitLogTask slt = new SplitLogTask.Owned(server);
+ SplitLogTask slt = new SplitLogTask.Owned(server, mode);
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task);
@@ -458,7 +456,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* @param curTask
* @param curTaskZKVersion
*/
- void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
+ void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
+ final int reportPeriod) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
@@ -470,7 +469,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
if ((t - last_report_at) > reportPeriod) {
last_report_at = t;
int latestZKVersion =
- attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue());
+ attemptToOwnTask(false, watcher, serverName, curTask, mode, zkVersion.intValue());
if (latestZKVersion < 0) {
LOG.warn("Failed to heartbeat the task" + curTask);
return false;
@@ -481,9 +480,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
};
- HLogSplitterHandler hsh =
- new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
- this.splitTaskExecutor);
+ HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter,
+ this.tasksInProgress, this.splitTaskExecutor, mode);
this.executorService.submit(hsh);
}
@@ -658,6 +656,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
RESIGNED(),
PREEMPTED()
}
- Status exec(String name, CancelableProgressable p);
+ Status exec(String name, RecoveryMode mode, CancelableProgressable p);
}
}