You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/10/31 04:00:08 UTC

git commit: TAJO-294: Removing dead workers from the live worker list. (Keuntae Park via jihoon)

Updated Branches:
  refs/heads/master 99959420a -> 8e6fec0fc


TAJO-294: Removing dead workers from the live worker list. (Keuntae Park via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/8e6fec0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/8e6fec0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/8e6fec0f

Branch: refs/heads/master
Commit: 8e6fec0fcb95c24017ee082df44853a222eb9db5
Parents: 9995942
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Oct 31 11:50:08 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Oct 31 11:50:08 2013 +0900

----------------------------------------------------------------------
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +
 .../master/rm/TajoWorkerResourceManager.java    | 67 ++++++++++++++++++++
 .../apache/tajo/master/rm/WorkerResource.java   | 11 +++-
 .../main/java/org/apache/tajo/util/JSPUtil.java |  3 +
 .../resources/webapps/admin/catalogview.jsp     |  4 +-
 .../main/resources/webapps/admin/cluster.jsp    | 38 +++++------
 .../src/main/resources/webapps/worker/index.jsp |  4 +-
 7 files changed, 105 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5b779ad..eba6eaf 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -104,6 +104,8 @@ public class TajoConf extends YarnConfiguration {
     // Tajo Worker History
     WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 12 * 60), // 12 hours
 
+    WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000),  //120 sec
+
     // Resource Manager
     RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager"),
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 136f6ea..7ffc563 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -62,6 +63,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private WorkerResourceAllocationThread workerResourceAllocator;
 
+  private WorkerMonitorThread workerMonitor;
+
   private final BlockingQueue<WorkerResourceRequest> requestQueue;
 
   private final List<WorkerResourceRequest> reAllocationList;
@@ -77,6 +80,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
     workerResourceAllocator = new WorkerResourceAllocationThread();
     workerResourceAllocator.start();
+
+    workerMonitor = new WorkerMonitorThread();
+    workerMonitor.start();
   }
 
   public Map<String, WorkerResource> getWorkers() {
@@ -107,6 +113,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     if(workerResourceAllocator != null) {
       workerResourceAllocator.interrupt();
     }
+    if(workerMonitor != null) {
+      workerMonitor.interrupt();
+    }
   }
 
   @Override
@@ -191,6 +200,62 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
+  class WorkerMonitorThread extends Thread {
+    int heartbeatTimeout;
+
+    @Override
+    public void run() {
+      heartbeatTimeout = masterContext.getConf().getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
+      LOG.info("WorkerMonitor start");
+      while(!stopped.get()) {
+        try {
+          Thread.sleep(10 * 1000);
+        } catch (InterruptedException e) {
+          if(stopped.get()) {
+            break;
+          }
+        }
+        synchronized(workerResourceLock) {
+          Set<String> workerHolders = new HashSet<String>();
+          workerHolders.addAll(liveWorkerResources);
+          for(String eachLiveWorker: workerHolders) {
+            WorkerResource worker = allWorkerResourceMap.get(eachLiveWorker);
+            if(worker == null) {
+              LOG.warn(eachLiveWorker + " not in WorkerReosurceMap");
+              continue;
+            }
+
+            if(System.currentTimeMillis() - worker.getLastHeartbeat() >= heartbeatTimeout) {
+              liveWorkerResources.remove(eachLiveWorker);
+              deadWorkerResources.add(eachLiveWorker);
+              worker.setWorkerStatus(WorkerStatus.DEAD);
+              LOG.warn("Worker [" + eachLiveWorker + "] is dead.");
+            }
+          }
+
+          //QueryMaster
+          workerHolders.clear();
+
+          workerHolders.addAll(liveQueryMasterWorkerResources);
+          for(String eachLiveWorker: workerHolders) {
+            WorkerResource worker = allWorkerResourceMap.get(eachLiveWorker);
+            if(worker == null) {
+              LOG.warn(eachLiveWorker + " not in WorkerResourceMap");
+              continue;
+            }
+
+            if(System.currentTimeMillis() - worker.getLastHeartbeat() >= heartbeatTimeout) {
+              liveQueryMasterWorkerResources.remove(eachLiveWorker);
+              deadWorkerResources.add(eachLiveWorker);
+              worker.setWorkerStatus(WorkerStatus.DEAD);
+              LOG.warn("QueryMaster [" + eachLiveWorker + "] is dead.");
+            }
+          }
+        }
+      }
+    }
+  }
+
   class WorkerResourceRequest {
     boolean queryMasterRequest;
     QueryId queryId;
@@ -368,9 +433,11 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
           if(queryMasterMode) {
             liveQueryMasterWorkerResources.add(workerKey);
             workerResource.setNumRunningTasks(0);
+            LOG.info("Heartbeat received from QueryMaster [" + workerKey + "] again.");
           }
           if(taskRunnerMode) {
             liveWorkerResources.add(workerKey);
+            LOG.info("Heartbeat received from Worker [" + workerKey + "] again.");
           }
         }
         workerResource.setLastHeartbeat(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index bd7db6d..b702063 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -20,13 +20,12 @@ package org.apache.tajo.master.rm;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.worker.TajoWorker;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class WorkerResource {
+public class WorkerResource implements Comparable<WorkerResource> {
   private static final Log LOG = LogFactory.getLog(WorkerResource.class);
 
   private String allocatedHost;
@@ -328,4 +327,12 @@ public class WorkerResource {
   public void releaseQueryMasterTask() {
     numQueryMasterTasks.getAndDecrement();
   }
+
+  @Override
+  public int compareTo(WorkerResource workerResource) {
+    if(workerResource == null) {
+      return 1;
+    }
+    return getId().compareTo(workerResource.getId());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
index 28598d2..439393b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -52,6 +52,9 @@ public class JSPUtil {
   }
 
   public static String getElapsedTime(long startTime, long finishTime) {
+    if(startTime == 0) {
+      return "-";
+    }
     return finishTime == 0 ? decimalF.format((System.currentTimeMillis() - startTime) / 1000) + " sec"
         : decimalF.format((finishTime - startTime) / 1000) + " sec";
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
index 7c69a29..f36752b 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
@@ -123,8 +123,8 @@
             <table border="1" class='border_table'>
               <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getPath()%></td></tr>
               <tr><td>Store type</td><td><%=tableDesc.getMeta().getStoreType()%></td></tr>
-              <tr><td># rows</td><td><%=tableDesc.getMeta().getStat().getNumRows()%></td></tr>
-              <tr><td>Volume</td><td><%=FileUtil.humanReadableByteCount(tableDesc.getMeta().getStat().getNumBytes(),true)%></td></tr>
+              <tr><td># rows</td><td><%=(tableDesc.hasStats() ? ("" + tableDesc.getStats().getNumRows()) : "-")%></td></tr>
+              <tr><td>Volume</td><td><%=(tableDesc.hasStats() ? FileUtil.humanReadableByteCount(tableDesc.getStats().getNumBytes(),true) : "-")%></td></tr>
               <tr><td>Options</td><td><%=optionStr%></td></tr>
             </table>
           </div>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
index bb35ec6..d0bf887 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
@@ -23,6 +23,7 @@
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="org.apache.tajo.master.*" %>
 <%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -36,12 +37,12 @@
 
   int runningQueryMasterTasks = 0;
 
-  List<WorkerResource> liveWorkers = new ArrayList<WorkerResource>();
-  List<WorkerResource> deadWorkers = new ArrayList<WorkerResource>();
-  List<WorkerResource> decommissionWorkers = new ArrayList<WorkerResource>();
+  Set<WorkerResource> liveWorkers = new TreeSet<WorkerResource>();
+  Set<WorkerResource> deadWorkers = new TreeSet<WorkerResource>();
+  Set<WorkerResource> decommissionWorkers = new TreeSet<WorkerResource>();
 
-  List<WorkerResource> liveQueryMasters = new ArrayList<WorkerResource>();
-  List<WorkerResource> deadQueryMasters = new ArrayList<WorkerResource>();
+  Set<WorkerResource> liveQueryMasters = new TreeSet<WorkerResource>();
+  Set<WorkerResource> deadQueryMasters = new TreeSet<WorkerResource>();
 
   for(WorkerResource eachWorker: workers.values()) {
     if(eachWorker.isQueryMasterMode()) {
@@ -93,7 +94,7 @@
   } else {
 %>
   <table width="100%" class="border_table" border="1">
-    <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running QueryMaster Tasks</th><th>Heap(free/max)</th><th>Status</th></tr>
+    <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running QueryMaster Tasks</th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
 
 <%
     int no = 1;
@@ -103,9 +104,10 @@
     <tr>
       <td width='30' align='right'><%=no++%></td>
       <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></a></td>
-      <td width='100'><%=queryMaster.getClientPort()%></td>
+      <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
       <td width='200' align='right'><%=queryMaster.getNumQueryMasterTasks()%></td>
       <td width='200' align='left'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
+      <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeat(), System.currentTimeMillis())%></td>
       <td width='100' align='center'><%=queryMaster.getWorkerStatus()%></td>
     </tr>
 <%
@@ -133,7 +135,7 @@
       <td width='30' align='right'><%=no++%></td>
       <td><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></td>
       <td><%=queryMaster.getClientPort()%></td>
-      <td><%=queryMaster.getWorkerStatus()%></td>
+      <td align='center'><%=queryMaster.getWorkerStatus()%></td>
     </tr>
 <%
       } //end fo for
@@ -155,7 +157,7 @@
   } else {
 %>
   <table width="100%" class="border_table" border="1">
-    <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+    <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Heartbeat</th><th>Status</th></tr>
 <%
     int no = 1;
     for(WorkerResource worker: liveWorkers) {
@@ -164,12 +166,13 @@
     <tr>
       <td width='30' align='right'><%=no++%></td>
       <td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></a></td>
-      <td width='150'><%=worker.getPullServerPort()%></td>
+      <td width='150' align='center'><%=worker.getPullServerPort()%></td>
       <td width='100' align='right'><%=worker.getNumRunningTasks()%></td>
       <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
       <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
       <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
       <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+      <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeat(), System.currentTimeMillis())%></td>
       <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
     </tr>
 <%
@@ -193,7 +196,7 @@
   } else {
 %>
   <table width="100%" class="border_table" border="1">
-    <tr><th>No</th><th>Worker</th><th>Ports</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+    <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
 <%
       int no = 1;
       for(WorkerResource worker: deadWorkers) {
@@ -201,13 +204,12 @@
     <tr>
       <td width='30' align='right'><%=no++%></td>
       <td><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></td>
-      <td><%=worker.portsToStr()%></td>
-      <td><%=worker.getNumRunningTasks()%></td>
-      <td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
-      <td><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
-      <td><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
-      <td><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
-      <td><%=worker.getWorkerStatus()%></td>
+      <td width='150' align='center'><%=worker.getPullServerPort()%></td>
+      <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
+      <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
+      <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
+      <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+      <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
     </tr>
 <%
       } //end fo for

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
index 6d862a1..eb8800c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
@@ -63,7 +63,7 @@ if(tajoWorker.getWorkerContext().isQueryMasterMode()) {
   List<QueryMasterTask> finishedQueryMasterTasks = JSPUtil.sortQueryMasterTask(tajoWorker.getWorkerContext()
           .getQueryMasterManagerService().getQueryMaster().getFinishedQueryMasterTasks(), true);
 %>
-  <h3>Running QueryMaster</h3>
+  <h3>Running Query</h3>
   <%
     if(queryMasterTasks.isEmpty()) {
       out.write("No running query master");
@@ -89,7 +89,7 @@ if(tajoWorker.getWorkerContext().isQueryMasterMode()) {
   </table>
   <p/>
   <hr/>
-  <h3>Finished QueryMaster</h3>
+  <h3>Finished Query</h3>
   <%
     if(finishedQueryMasterTasks.isEmpty()) {
       out.write("No finished query master");