You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/10/31 04:00:08 UTC
git commit: TAJO-294: Removing dead workers from the live worker
list. (Keuntae Park via jihoon)
Updated Branches:
refs/heads/master 99959420a -> 8e6fec0fc
TAJO-294: Removing dead workers from the live worker list. (Keuntae Park via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/8e6fec0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/8e6fec0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/8e6fec0f
Branch: refs/heads/master
Commit: 8e6fec0fcb95c24017ee082df44853a222eb9db5
Parents: 9995942
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Oct 31 11:50:08 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Oct 31 11:50:08 2013 +0900
----------------------------------------------------------------------
.../java/org/apache/tajo/conf/TajoConf.java | 2 +
.../master/rm/TajoWorkerResourceManager.java | 67 ++++++++++++++++++++
.../apache/tajo/master/rm/WorkerResource.java | 11 +++-
.../main/java/org/apache/tajo/util/JSPUtil.java | 3 +
.../resources/webapps/admin/catalogview.jsp | 4 +-
.../main/resources/webapps/admin/cluster.jsp | 38 +++++------
.../src/main/resources/webapps/worker/index.jsp | 4 +-
7 files changed, 105 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5b779ad..eba6eaf 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -104,6 +104,8 @@ public class TajoConf extends YarnConfiguration {
// Tajo Worker History
WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 12 * 60), // 12 hours
+ WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000), //120 sec
+
// Resource Manager
RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager"),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 136f6ea..7ffc563 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -62,6 +63,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private WorkerResourceAllocationThread workerResourceAllocator;
+ private WorkerMonitorThread workerMonitor;
+
private final BlockingQueue<WorkerResourceRequest> requestQueue;
private final List<WorkerResourceRequest> reAllocationList;
@@ -77,6 +80,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResourceAllocator = new WorkerResourceAllocationThread();
workerResourceAllocator.start();
+
+ workerMonitor = new WorkerMonitorThread();
+ workerMonitor.start();
}
public Map<String, WorkerResource> getWorkers() {
@@ -107,6 +113,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
if(workerResourceAllocator != null) {
workerResourceAllocator.interrupt();
}
+ if(workerMonitor != null) {
+ workerMonitor.interrupt();
+ }
}
@Override
@@ -191,6 +200,62 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
+ class WorkerMonitorThread extends Thread {
+ int heartbeatTimeout;
+
+ @Override
+ public void run() {
+ heartbeatTimeout = masterContext.getConf().getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
+ LOG.info("WorkerMonitor start");
+ while(!stopped.get()) {
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ if(stopped.get()) {
+ break;
+ }
+ }
+ synchronized(workerResourceLock) {
+ Set<String> workerHolders = new HashSet<String>();
+ workerHolders.addAll(liveWorkerResources);
+ for(String eachLiveWorker: workerHolders) {
+ WorkerResource worker = allWorkerResourceMap.get(eachLiveWorker);
+ if(worker == null) {
+ LOG.warn(eachLiveWorker + " not in WorkerReosurceMap");
+ continue;
+ }
+
+ if(System.currentTimeMillis() - worker.getLastHeartbeat() >= heartbeatTimeout) {
+ liveWorkerResources.remove(eachLiveWorker);
+ deadWorkerResources.add(eachLiveWorker);
+ worker.setWorkerStatus(WorkerStatus.DEAD);
+ LOG.warn("Worker [" + eachLiveWorker + "] is dead.");
+ }
+ }
+
+ //QueryMaster
+ workerHolders.clear();
+
+ workerHolders.addAll(liveQueryMasterWorkerResources);
+ for(String eachLiveWorker: workerHolders) {
+ WorkerResource worker = allWorkerResourceMap.get(eachLiveWorker);
+ if(worker == null) {
+ LOG.warn(eachLiveWorker + " not in WorkerResourceMap");
+ continue;
+ }
+
+ if(System.currentTimeMillis() - worker.getLastHeartbeat() >= heartbeatTimeout) {
+ liveQueryMasterWorkerResources.remove(eachLiveWorker);
+ deadWorkerResources.add(eachLiveWorker);
+ worker.setWorkerStatus(WorkerStatus.DEAD);
+ LOG.warn("QueryMaster [" + eachLiveWorker + "] is dead.");
+ }
+ }
+ }
+ }
+ }
+ }
+
class WorkerResourceRequest {
boolean queryMasterRequest;
QueryId queryId;
@@ -368,9 +433,11 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
if(queryMasterMode) {
liveQueryMasterWorkerResources.add(workerKey);
workerResource.setNumRunningTasks(0);
+ LOG.info("Heartbeat received from QueryMaster [" + workerKey + "] again.");
}
if(taskRunnerMode) {
liveWorkerResources.add(workerKey);
+ LOG.info("Heartbeat received from Worker [" + workerKey + "] again.");
}
}
workerResource.setLastHeartbeat(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index bd7db6d..b702063 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -20,13 +20,12 @@ package org.apache.tajo.master.rm;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.worker.TajoWorker;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class WorkerResource {
+public class WorkerResource implements Comparable<WorkerResource> {
private static final Log LOG = LogFactory.getLog(WorkerResource.class);
private String allocatedHost;
@@ -328,4 +327,12 @@ public class WorkerResource {
public void releaseQueryMasterTask() {
numQueryMasterTasks.getAndDecrement();
}
+
+ @Override
+ public int compareTo(WorkerResource workerResource) {
+ if(workerResource == null) {
+ return 1;
+ }
+ return getId().compareTo(workerResource.getId());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
index 28598d2..439393b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -52,6 +52,9 @@ public class JSPUtil {
}
public static String getElapsedTime(long startTime, long finishTime) {
+ if(startTime == 0) {
+ return "-";
+ }
return finishTime == 0 ? decimalF.format((System.currentTimeMillis() - startTime) / 1000) + " sec"
: decimalF.format((finishTime - startTime) / 1000) + " sec";
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
index 7c69a29..f36752b 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
@@ -123,8 +123,8 @@
<table border="1" class='border_table'>
<tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getPath()%></td></tr>
<tr><td>Store type</td><td><%=tableDesc.getMeta().getStoreType()%></td></tr>
- <tr><td># rows</td><td><%=tableDesc.getMeta().getStat().getNumRows()%></td></tr>
- <tr><td>Volume</td><td><%=FileUtil.humanReadableByteCount(tableDesc.getMeta().getStat().getNumBytes(),true)%></td></tr>
+ <tr><td># rows</td><td><%=(tableDesc.hasStats() ? ("" + tableDesc.getStats().getNumRows()) : "-")%></td></tr>
+ <tr><td>Volume</td><td><%=(tableDesc.hasStats() ? FileUtil.humanReadableByteCount(tableDesc.getStats().getNumBytes(),true) : "-")%></td></tr>
<tr><td>Options</td><td><%=optionStr%></td></tr>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
index bb35ec6..d0bf887 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
@@ -23,6 +23,7 @@
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.master.*" %>
<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -36,12 +37,12 @@
int runningQueryMasterTasks = 0;
- List<WorkerResource> liveWorkers = new ArrayList<WorkerResource>();
- List<WorkerResource> deadWorkers = new ArrayList<WorkerResource>();
- List<WorkerResource> decommissionWorkers = new ArrayList<WorkerResource>();
+ Set<WorkerResource> liveWorkers = new TreeSet<WorkerResource>();
+ Set<WorkerResource> deadWorkers = new TreeSet<WorkerResource>();
+ Set<WorkerResource> decommissionWorkers = new TreeSet<WorkerResource>();
- List<WorkerResource> liveQueryMasters = new ArrayList<WorkerResource>();
- List<WorkerResource> deadQueryMasters = new ArrayList<WorkerResource>();
+ Set<WorkerResource> liveQueryMasters = new TreeSet<WorkerResource>();
+ Set<WorkerResource> deadQueryMasters = new TreeSet<WorkerResource>();
for(WorkerResource eachWorker: workers.values()) {
if(eachWorker.isQueryMasterMode()) {
@@ -93,7 +94,7 @@
} else {
%>
<table width="100%" class="border_table" border="1">
- <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running QueryMaster Tasks</th><th>Heap(free/max)</th><th>Status</th></tr>
+ <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running QueryMaster Tasks</th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
<%
int no = 1;
@@ -103,9 +104,10 @@
<tr>
<td width='30' align='right'><%=no++%></td>
<td><a href='<%=queryMasterHttp%>'><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></a></td>
- <td width='100'><%=queryMaster.getClientPort()%></td>
+ <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
<td width='200' align='right'><%=queryMaster.getNumQueryMasterTasks()%></td>
<td width='200' align='left'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeat(), System.currentTimeMillis())%></td>
<td width='100' align='center'><%=queryMaster.getWorkerStatus()%></td>
</tr>
<%
@@ -133,7 +135,7 @@
<td width='30' align='right'><%=no++%></td>
<td><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></td>
<td><%=queryMaster.getClientPort()%></td>
- <td><%=queryMaster.getWorkerStatus()%></td>
+ <td align='center'><%=queryMaster.getWorkerStatus()%></td>
</tr>
<%
} //end fo for
@@ -155,7 +157,7 @@
} else {
%>
<table width="100%" class="border_table" border="1">
- <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+ <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Heartbeat</th><th>Status</th></tr>
<%
int no = 1;
for(WorkerResource worker: liveWorkers) {
@@ -164,12 +166,13 @@
<tr>
<td width='30' align='right'><%=no++%></td>
<td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></a></td>
- <td width='150'><%=worker.getPullServerPort()%></td>
+ <td width='150' align='center'><%=worker.getPullServerPort()%></td>
<td width='100' align='right'><%=worker.getNumRunningTasks()%></td>
<td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
<td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
<td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
<td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+ <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeat(), System.currentTimeMillis())%></td>
<td width='100' align='center'><%=worker.getWorkerStatus()%></td>
</tr>
<%
@@ -193,7 +196,7 @@
} else {
%>
<table width="100%" class="border_table" border="1">
- <tr><th>No</th><th>Worker</th><th>Ports</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+ <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
<%
int no = 1;
for(WorkerResource worker: deadWorkers) {
@@ -201,13 +204,12 @@
<tr>
<td width='30' align='right'><%=no++%></td>
<td><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></td>
- <td><%=worker.portsToStr()%></td>
- <td><%=worker.getNumRunningTasks()%></td>
- <td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
- <td><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
- <td><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
- <td><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
- <td><%=worker.getWorkerStatus()%></td>
+ <td width='150' align='center'><%=worker.getPullServerPort()%></td>
+ <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
+ <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
+ <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+ <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
</tr>
<%
} //end fo for
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e6fec0f/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
index 6d862a1..eb8800c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
@@ -63,7 +63,7 @@ if(tajoWorker.getWorkerContext().isQueryMasterMode()) {
List<QueryMasterTask> finishedQueryMasterTasks = JSPUtil.sortQueryMasterTask(tajoWorker.getWorkerContext()
.getQueryMasterManagerService().getQueryMaster().getFinishedQueryMasterTasks(), true);
%>
- <h3>Running QueryMaster</h3>
+ <h3>Running Query</h3>
<%
if(queryMasterTasks.isEmpty()) {
out.write("No running query master");
@@ -89,7 +89,7 @@ if(tajoWorker.getWorkerContext().isQueryMasterMode()) {
</table>
<p/>
<hr/>
- <h3>Finished QueryMaster</h3>
+ <h3>Finished Query</h3>
<%
if(finishedQueryMasterTasks.isEmpty()) {
out.write("No finished query master");