You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/31 20:30:19 UTC
svn commit: r1379542 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/zookeeper/
test/java/org/apache/hadoop/hbase/master/
Author: mbautin
Date: Fri Aug 31 18:30:18 2012
New Revision: 1379542
URL: http://svn.apache.org/viewvc?rev=1379542&view=rev
Log:
[0.89-fb] [HBASE-6680] Separate serverName and workerName for SplitLogWorkers
Author: aaiyer
Summary:
With regards to Prakash's comments on the original diff
https://phabricator.fb.com/D560200
1) move worker id logic inside splitLogWorker
2) change SplitLogManager to rely on the serverName instead of workerName to
handle server failures.
Test Plan:
additional testing:
- deploy to dev cluster
- initiate split logs by killing RS
- Kill another RS during split logs and verify that the master handles the
event correctly.
Reviewers: kannan, pkhemani, kranganathan
Reviewed By: pkhemani
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D561386
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Aug 31 18:30:18 2012
@@ -1158,7 +1158,7 @@ public class HMaster extends HasThread i
"splitting");
return;
}
- splitLogManager.handleDeadWorker(serverName);
+ splitLogManager.handleDeadServer(serverName);
}
splitLogManager.splitLogDistributed(logDirs);
} else {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Fri Aug 31 18:30:18 2012
@@ -1002,7 +1002,7 @@ public class ServerManager {
" to dead servers, added shutdown processing operation");
this.deadServers.add(serverName);
if (this.master.getSplitLogManager() != null) {
- this.master.getSplitLogManager().handleDeadWorker(serverName);
+ this.master.getSplitLogManager().handleDeadServer(serverName);
}
this.master.getRegionServerOperationQueue().
put(new ProcessServerShutdown(master, info));
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Fri Aug 31 18:30:18 2012
@@ -110,8 +110,8 @@ public class SplitLogManager implements
new ConcurrentHashMap<String, Task>();
private TimeoutMonitor timeoutMonitor;
- private Set<String> deadWorkers = null;
- private Object deadWorkersLock = new Object();
+ private Set<String> deadServers = null;
+ private Object deadServersLock = new Object();
/**
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
@@ -424,6 +424,7 @@ public class SplitLogManager implements
handleUnassignedTask(path);
} else if (TaskState.TASK_OWNED.equals(data)) {
heartbeat(path, version,
+ TaskState.TASK_OWNED.getServerName(data),
TaskState.TASK_OWNED.getWriterName(data));
} else if (TaskState.TASK_RESIGNED.equals(data)) {
LOG.info("task " + path + " entered state " + new String(data));
@@ -481,14 +482,14 @@ public class SplitLogManager implements
}
private void heartbeat(String path, int new_version,
- String workerName) {
+ String serverName, String workerName) {
Task task = findOrCreateOrphanTask(path);
if (new_version != task.last_version) {
if (task.isUnassigned()) {
LOG.info("task " + path + " acquired by " + workerName);
}
task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
- new_version, workerName);
+ new_version, serverName, workerName);
tot_mgr_heartbeat.incrementAndGet();
} else {
// duplicate heartbeats - heartbeats w/o zk node version
@@ -804,6 +805,7 @@ public class SplitLogManager implements
volatile long last_update;
volatile int last_version;
volatile String cur_worker_name;
+ volatile String cur_server_name;
TaskBatch batch;
volatile TerminationStatus status;
volatile int incarnation;
@@ -815,6 +817,7 @@ public class SplitLogManager implements
return ("last_update = " + last_update +
" last_version = " + last_version +
" cur_worker_name = " + cur_worker_name +
+ " cur_server_name = " + cur_server_name +
" status = " + status +
" incarnation = " + incarnation +
" resubmits = " + unforcedResubmits +
@@ -840,28 +843,36 @@ public class SplitLogManager implements
last_update = time;
}
- public void heartbeat(long time, int version, String worker) {
+ public void heartbeat(long time, int version, String server, String worker) {
last_version = version;
last_update = time;
+ if ((cur_server_name != null && !cur_server_name.equals(server))
+ || (cur_worker_name != null && !cur_worker_name.equals(worker))) {
+ LOG.warn("heartBeat updating to a different worker/server " +
+ " old " + cur_worker_name + " on " + cur_server_name +
+ " updating to " + worker + " on " + server);
+ }
+ cur_server_name = server;
cur_worker_name = worker;
}
public void setUnassigned() {
+ cur_server_name = null;
cur_worker_name = null;
last_update = -1;
}
}
- public void handleDeadWorker(String worker_name) {
+ public void handleDeadServer(String server_name) {
// resubmit the tasks on the TimeoutMonitor thread. Makes it easier
// to reason about concurrency. Makes it easier to retry.
- synchronized (deadWorkersLock) {
- if (deadWorkers == null) {
- deadWorkers = new HashSet<String>(100);
+ synchronized (deadServersLock) {
+ if (deadServers == null) {
+ deadServers = new HashSet<String>(100);
}
- deadWorkers.add(worker_name);
+ deadServers.add(server_name);
}
- LOG.info("dead splitlog worker " + worker_name);
+ LOG.info("dead splitlog worker(s) on server: " + server_name);
}
/**
@@ -881,17 +892,17 @@ public class SplitLogManager implements
int unassigned = 0;
int tot = 0;
boolean found_assigned_task = false;
- Set<String> localDeadWorkers;
+ Set<String> localDeadServers;
- synchronized (deadWorkersLock) {
- localDeadWorkers = deadWorkers;
- deadWorkers = null;
+ synchronized (deadServersLock) {
+ localDeadServers = deadServers;
+ deadServers = null;
}
for (Map.Entry<String, Task> e : tasks.entrySet()) {
String path = e.getKey();
Task task = e.getValue();
- String cur_worker = task.cur_worker_name;
+ String cur_server = task.cur_server_name;
tot++;
// don't easily resubmit a task which hasn't been picked up yet. It
// might be a long while before a SplitLogWorker is free to pick up a
@@ -903,14 +914,14 @@ public class SplitLogManager implements
continue;
}
found_assigned_task = true;
- if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
+ if (localDeadServers != null && localDeadServers.contains(cur_server)) {
tot_mgr_resubmit_dead_server_task.incrementAndGet();
if (resubmit(path, task, FORCE)) {
resubmitted++;
} else {
- handleDeadWorker(cur_worker);
+ handleDeadServer(cur_server);
LOG.warn("Failed to resubmit task " + path + " owned by dead " +
- cur_worker + ", will retry.");
+ cur_server + ", will retry.");
}
} else if (resubmit(path, task, CHECK)) {
resubmitted++;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 31 18:30:18 2012
@@ -1490,7 +1490,7 @@ public class HRegionServer implements HR
this.splitLogWorkers = new ArrayList<SplitLogWorker>(numSplitLogWorkers);
for (int i = 0; i < numSplitLogWorkers; i++) {
SplitLogWorker splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
- this.getConfiguration(), this.serverInfo.getServerName() + "_Worker_" + i,
+ this.getConfiguration(), this.serverInfo.getServerName(),
logCloseThreadPool, masterRef);
this.splitLogWorkers.add(splitLogWorker);
splitLogWorker.start();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Fri Aug 31 18:30:18 2012
@@ -77,7 +77,7 @@ public class SplitLogWorker implements R
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
Thread worker;
- private final String serverName;
+ private final String workerName;
private final TaskExecutor executor;
private long zkretries;
@@ -89,11 +89,12 @@ public class SplitLogWorker implements R
private Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
protected ZooKeeperWrapper watcher;
+ private static int numWorkers = 0;
public SplitLogWorker(ZooKeeperWrapper watcher, Configuration conf,
String serverName, TaskExecutor executor) {
this.watcher = watcher;
- this.serverName = serverName;
+ this.workerName = serverName + " Worker-" + numWorkers++;
this.executor = executor;
this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3);
}
@@ -128,8 +129,8 @@ public class SplitLogWorker implements R
LOG.warn("file status is null for file " + filename);
return Status.ERR;
}
- String tmpname =
- ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
+ String tmpname = ZKSplitLog.getSplitLogDirTmpComponent(
+ serverName + "-worker-" + numWorkers, filename);
if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
st, fs, conf, p, logCloseThreadPool, masterRef.get()) == false) {
return Status.PREEMPTED;
@@ -272,7 +273,7 @@ public class SplitLogWorker implements R
tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
- LOG.info("worker " + serverName + " acquired task " + path);
+ LOG.info("worker " + workerName + " acquired task " + path);
tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
@@ -321,7 +322,7 @@ public class SplitLogWorker implements R
}
} finally {
if (t > 0) {
- LOG.info("worker " + serverName + " done with task " + path +
+ LOG.info("worker " + workerName + " done with task " + path +
" in " + (System.currentTimeMillis() - t) + "ms");
}
synchronized (grabTaskLock) {
@@ -346,7 +347,7 @@ public class SplitLogWorker implements R
private boolean ownTask(boolean isFirstTime) {
try {
Stat stat = this.watcher.setDataGetStat(currentTask,
- TaskState.TASK_OWNED.get(serverName), currentVersion);
+ TaskState.TASK_OWNED.get(workerName), currentVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + currentTask);
tot_wkr_task_heartbeat_failed.incrementAndGet();
@@ -385,7 +386,7 @@ public class SplitLogWorker implements R
String path = currentTask;
currentTask = null;
try {
- if (watcher.setData(path, ts.get(serverName), currentVersion)) {
+ if (watcher.setData(path, ts.get(workerName), currentVersion)) {
LOG.info("successfully transitioned task " + path +
" to final state " + ts);
ctr.incrementAndGet();
@@ -424,12 +425,12 @@ public class SplitLogWorker implements R
// UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this
// worker to unassigned to owned by another worker
- if (! TaskState.TASK_OWNED.equals(data, serverName) &&
- ! TaskState.TASK_DONE.equals(data, serverName) &&
- ! TaskState.TASK_ERR.equals(data, serverName) &&
- ! TaskState.TASK_RESIGNED.equals(data, serverName)) {
- LOG.info("task " + taskpath + " preempted from server " +
- serverName + " ... current task state and owner - " +
+ if (! TaskState.TASK_OWNED.equals(data, workerName) &&
+ ! TaskState.TASK_DONE.equals(data, workerName) &&
+ ! TaskState.TASK_ERR.equals(data, workerName) &&
+ ! TaskState.TASK_RESIGNED.equals(data, workerName)) {
+ LOG.info("task " + taskpath + " preempted from worker " +
+ workerName + " ... current task state and owner - " +
new String(data));
stopTask();
}
@@ -533,7 +534,7 @@ public class SplitLogWorker implements R
* start the SplitLogWorker thread
*/
public void start() {
- worker = new Thread(null, this, "SplitLogWorker-" + serverName);
+ worker = new Thread(null, this, "SplitLogWorker-" + workerName);
exitWorker = false;
worker.start();
return;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Fri Aug 31 18:30:18 2012
@@ -115,8 +115,8 @@ public class ZKSplitLog {
state = s.getBytes();
}
- public byte[] get(String serverName) {
- return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
+ public byte[] get(String workerName) {
+ return (Bytes.add(state, " ".getBytes(), workerName.getBytes()));
}
public String getWriterName(byte[] data) {
@@ -124,6 +124,12 @@ public class ZKSplitLog {
return str.substring(str.indexOf(' ') + 1);
}
+ public String getServerName(byte[] data) {
+ String str = Bytes.toString(data);
+ String [] parts = str.split(" ");
+ return parts[1];
+ }
+
/**
* @param s
@@ -141,8 +147,8 @@ public class ZKSplitLog {
return (true);
}
- public boolean equals(byte[] s, String serverName) {
- return (Arrays.equals(s, get(serverName)));
+ public boolean equals(byte[] s, String workerName) {
+ return (Arrays.equals(s, get(workerName)));
}
@Override
public String toString() {
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Fri Aug 31 18:30:18 2012
@@ -445,7 +445,7 @@ public class TestSplitLogManager {
zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
- slm.handleDeadWorker("worker1");
+ slm.handleDeadServer("worker1");
waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000);