You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/31 20:30:19 UTC

svn commit: r1379542 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/zookeeper/ test/java/org/apache/hadoop/hbase/master/

Author: mbautin
Date: Fri Aug 31 18:30:18 2012
New Revision: 1379542

URL: http://svn.apache.org/viewvc?rev=1379542&view=rev
Log:
[0.89-fb] [HBASE-6680]  Separate serverName and workerName for SplitLogWorkers

Author: aaiyer

Summary:
 With regards to Prakash's comments on the original diff
 https://phabricator.fb.com/D560200

1) move worker id logic inside splitLogWorker
2) change SplitLogManager to rely on the serverName instead of workerName to
handle server failures.

Test Plan:
additional testing:
   - deploy to dev cluster
   - initiate split logs by killing RS
   - Kill another RS during split logs and verify that the master handles the
   event correctly.

Reviewers: kannan, pkhemani, kranganathan

Reviewed By: pkhemani

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D561386

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Aug 31 18:30:18 2012
@@ -1158,7 +1158,7 @@ public class HMaster extends HasThread i
               "splitting");
           return;
         }
-        splitLogManager.handleDeadWorker(serverName);
+        splitLogManager.handleDeadServer(serverName);
       }
       splitLogManager.splitLogDistributed(logDirs);
     } else {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Fri Aug 31 18:30:18 2012
@@ -1002,7 +1002,7 @@ public class ServerManager {
       " to dead servers, added shutdown processing operation");
     this.deadServers.add(serverName);
     if (this.master.getSplitLogManager() != null) {
-      this.master.getSplitLogManager().handleDeadWorker(serverName);
+      this.master.getSplitLogManager().handleDeadServer(serverName);
     }
     this.master.getRegionServerOperationQueue().
       put(new ProcessServerShutdown(master, info));

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Fri Aug 31 18:30:18 2012
@@ -110,8 +110,8 @@ public class SplitLogManager implements 
     new ConcurrentHashMap<String, Task>();
   private TimeoutMonitor timeoutMonitor;
 
-  private Set<String> deadWorkers = null;
-  private Object deadWorkersLock = new Object();
+  private Set<String> deadServers = null;
+  private Object deadServersLock = new Object();
 
   /**
    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
@@ -424,6 +424,7 @@ public class SplitLogManager implements 
       handleUnassignedTask(path);
     } else if (TaskState.TASK_OWNED.equals(data)) {
       heartbeat(path, version,
+          TaskState.TASK_OWNED.getServerName(data),
           TaskState.TASK_OWNED.getWriterName(data));
     } else if (TaskState.TASK_RESIGNED.equals(data)) {
       LOG.info("task " + path + " entered state " + new String(data));
@@ -481,14 +482,14 @@ public class SplitLogManager implements 
   }
 
   private void heartbeat(String path, int new_version,
-      String workerName) {
+      String serverName, String workerName) {
     Task task = findOrCreateOrphanTask(path);
     if (new_version != task.last_version) {
       if (task.isUnassigned()) {
         LOG.info("task " + path + " acquired by " + workerName);
       }
       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
-          new_version, workerName);
+          new_version, serverName, workerName);
       tot_mgr_heartbeat.incrementAndGet();
     } else {
       // duplicate heartbeats - heartbeats w/o zk node version
@@ -804,6 +805,7 @@ public class SplitLogManager implements 
     volatile long last_update;
     volatile int last_version;
     volatile String cur_worker_name;
+    volatile String cur_server_name;
     TaskBatch batch;
     volatile TerminationStatus status;
     volatile int incarnation;
@@ -815,6 +817,7 @@ public class SplitLogManager implements 
       return ("last_update = " + last_update +
           " last_version = " + last_version +
           " cur_worker_name = " + cur_worker_name +
+          " cur_server_name = " + cur_server_name +
           " status = " + status +
           " incarnation = " + incarnation +
           " resubmits = " + unforcedResubmits +
@@ -840,28 +843,36 @@ public class SplitLogManager implements 
       last_update = time;
     }
 
-    public void heartbeat(long time, int version, String worker) {
+    public void heartbeat(long time, int version, String server, String worker) {
       last_version = version;
       last_update = time;
+      if ((cur_server_name != null && !cur_server_name.equals(server))
+          || (cur_worker_name != null && !cur_worker_name.equals(worker))) {
+        LOG.warn("heartBeat updating to a different worker/server " +
+           " old " + cur_worker_name + " on " + cur_server_name +
+           " updating to " + worker + " on " + server);
+      }
+      cur_server_name = server;
       cur_worker_name = worker;
     }
 
     public void setUnassigned() {
+      cur_server_name = null;
       cur_worker_name = null;
       last_update = -1;
     }
   }
 
-  public void handleDeadWorker(String worker_name) {
+  public void handleDeadServer(String server_name) {
     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
     // to reason about concurrency. Makes it easier to retry.
-    synchronized (deadWorkersLock) {
-      if (deadWorkers == null) {
-        deadWorkers = new HashSet<String>(100);
+    synchronized (deadServersLock) {
+      if (deadServers == null) {
+        deadServers = new HashSet<String>(100);
       }
-      deadWorkers.add(worker_name);
+      deadServers.add(server_name);
     }
-    LOG.info("dead splitlog worker " + worker_name);
+    LOG.info("dead splitlog worker(s) on server: " + server_name);
   }
 
   /**
@@ -881,17 +892,17 @@ public class SplitLogManager implements 
       int unassigned = 0;
       int tot = 0;
       boolean found_assigned_task = false;
-      Set<String> localDeadWorkers;
+      Set<String> localDeadServers;
 
-      synchronized (deadWorkersLock) {
-        localDeadWorkers = deadWorkers;
-        deadWorkers = null;
+      synchronized (deadServersLock) {
+        localDeadServers = deadServers;
+        deadServers = null;
       }
 
       for (Map.Entry<String, Task> e : tasks.entrySet()) {
         String path = e.getKey();
         Task task = e.getValue();
-        String cur_worker = task.cur_worker_name;
+        String cur_server = task.cur_server_name;
         tot++;
         // don't easily resubmit a task which hasn't been picked up yet. It
         // might be a long while before a SplitLogWorker is free to pick up a
@@ -903,14 +914,14 @@ public class SplitLogManager implements 
           continue;
         }
         found_assigned_task = true;
-        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
+        if (localDeadServers != null && localDeadServers.contains(cur_server)) {
           tot_mgr_resubmit_dead_server_task.incrementAndGet();
           if (resubmit(path, task, FORCE)) {
             resubmitted++;
           } else {
-            handleDeadWorker(cur_worker);
+            handleDeadServer(cur_server);
             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
-                cur_worker + ", will retry.");
+                cur_server + ", will retry.");
           }
         } else if (resubmit(path, task, CHECK)) {
           resubmitted++;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 31 18:30:18 2012
@@ -1490,7 +1490,7 @@ public class HRegionServer implements HR
     this.splitLogWorkers = new ArrayList<SplitLogWorker>(numSplitLogWorkers);
     for (int i = 0; i < numSplitLogWorkers; i++) {
       SplitLogWorker splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
-          this.getConfiguration(), this.serverInfo.getServerName() + "_Worker_" + i,
+          this.getConfiguration(), this.serverInfo.getServerName(),
           logCloseThreadPool, masterRef);
       this.splitLogWorkers.add(splitLogWorker);
       splitLogWorker.start();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Fri Aug 31 18:30:18 2012
@@ -77,7 +77,7 @@ public class SplitLogWorker implements R
   private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
 
   Thread worker;
-  private final String serverName;
+  private final String workerName;
   private final TaskExecutor executor;
   private long zkretries;
 
@@ -89,11 +89,12 @@ public class SplitLogWorker implements R
   private Object grabTaskLock = new Object();
   private boolean workerInGrabTask = false;
   protected ZooKeeperWrapper watcher;
+  private static int numWorkers = 0;
 
   public SplitLogWorker(ZooKeeperWrapper watcher, Configuration conf,
       String serverName, TaskExecutor executor) {
     this.watcher = watcher;
-    this.serverName = serverName;
+    this.workerName = serverName + " Worker-" + numWorkers++;
     this.executor = executor;
     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3);
   }
@@ -128,8 +129,8 @@ public class SplitLogWorker implements R
             LOG.warn("file status is null for file " + filename);
             return Status.ERR;
           }
-          String tmpname =
-            ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
+          String tmpname = ZKSplitLog.getSplitLogDirTmpComponent(
+              serverName + "-worker-" + numWorkers, filename);
           if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
               st, fs, conf, p, logCloseThreadPool, masterRef.get()) == false) {
             return Status.PREEMPTED;
@@ -272,7 +273,7 @@ public class SplitLogWorker implements R
         tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
         return;
       }
-      LOG.info("worker " + serverName + " acquired task " + path);
+      LOG.info("worker " + workerName + " acquired task " + path);
       tot_wkr_task_acquired.incrementAndGet();
       getDataSetWatchAsync();
 
@@ -321,7 +322,7 @@ public class SplitLogWorker implements R
       }
     } finally {
       if (t > 0) {
-        LOG.info("worker " + serverName + " done with task " + path +
+        LOG.info("worker " + workerName + " done with task " + path +
             " in " + (System.currentTimeMillis() - t) + "ms");
       }
       synchronized (grabTaskLock) {
@@ -346,7 +347,7 @@ public class SplitLogWorker implements R
   private boolean ownTask(boolean isFirstTime) {
     try {
       Stat stat = this.watcher.setDataGetStat(currentTask,
-          TaskState.TASK_OWNED.get(serverName), currentVersion);
+          TaskState.TASK_OWNED.get(workerName), currentVersion);
       if (stat == null) {
         LOG.warn("zk.setData() returned null for path " + currentTask);
         tot_wkr_task_heartbeat_failed.incrementAndGet();
@@ -385,7 +386,7 @@ public class SplitLogWorker implements R
     String path = currentTask;
     currentTask = null;
     try {
-      if (watcher.setData(path, ts.get(serverName), currentVersion)) {
+      if (watcher.setData(path, ts.get(workerName), currentVersion)) {
         LOG.info("successfully transitioned task " + path +
             " to final state " + ts);
         ctr.incrementAndGet();
@@ -424,12 +425,12 @@ public class SplitLogWorker implements R
           // UNASSIGNED because by the time this worker sets the data watch
           // the node might have made two transitions - from owned by this
           // worker to unassigned to owned by another worker
-          if (! TaskState.TASK_OWNED.equals(data, serverName) &&
-              ! TaskState.TASK_DONE.equals(data, serverName) &&
-              ! TaskState.TASK_ERR.equals(data, serverName) &&
-              ! TaskState.TASK_RESIGNED.equals(data, serverName)) {
-            LOG.info("task " + taskpath + " preempted from server " +
-                serverName + " ... current task state and owner - " +
+          if (! TaskState.TASK_OWNED.equals(data, workerName) &&
+              ! TaskState.TASK_DONE.equals(data, workerName) &&
+              ! TaskState.TASK_ERR.equals(data, workerName) &&
+              ! TaskState.TASK_RESIGNED.equals(data, workerName)) {
+            LOG.info("task " + taskpath + " preempted from worker " +
+                workerName + " ... current task state and owner - " +
                 new String(data));
             stopTask();
           }
@@ -533,7 +534,7 @@ public class SplitLogWorker implements R
    * start the SplitLogWorker thread
    */
   public void start() {
-    worker = new Thread(null, this, "SplitLogWorker-" + serverName);
+    worker = new Thread(null, this, "SplitLogWorker-" + workerName);
     exitWorker = false;
     worker.start();
     return;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Fri Aug 31 18:30:18 2012
@@ -115,8 +115,8 @@ public class ZKSplitLog {
       state = s.getBytes();
     }
 
-    public byte[] get(String serverName) {
-      return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
+    public byte[] get(String workerName) {
+      return (Bytes.add(state, " ".getBytes(), workerName.getBytes()));
     }
 
     public String getWriterName(byte[] data) {
@@ -124,6 +124,12 @@ public class ZKSplitLog {
       return str.substring(str.indexOf(' ') + 1);
     }
 
+    public String getServerName(byte[] data) {
+      String str = Bytes.toString(data);
+      String [] parts = str.split(" ");
+      return parts[1];
+    }
+
 
     /**
      * @param s
@@ -141,8 +147,8 @@ public class ZKSplitLog {
       return (true);
     }
 
-    public boolean equals(byte[] s, String serverName) {
-      return (Arrays.equals(s, get(serverName)));
+    public boolean equals(byte[] s, String workerName) {
+      return (Arrays.equals(s, get(workerName)));
     }
     @Override
     public String toString() {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1379542&r1=1379541&r2=1379542&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Fri Aug 31 18:30:18 2012
@@ -445,7 +445,7 @@ public class TestSplitLogManager {
 
     zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
     waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
-    slm.handleDeadWorker("worker1");
+    slm.handleDeadServer("worker1");
     waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
     waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000);