You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/09/12 19:00:43 UTC
git commit: TAJO-168: infinite loop occurs when QueryMaster is
stopping. (jinho)
Updated Branches:
refs/heads/master 3b595ddd3 -> 5d3966a8c
TAJO-168: infinite loop occurs when QueryMaster is stopping. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5d3966a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5d3966a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5d3966a8
Branch: refs/heads/master
Commit: 5d3966a8cd4c3256abf47d4a9084adc7778e8da3
Parents: 3b595dd
Author: jinossy <ji...@gmail.com>
Authored: Fri Sep 13 02:00:02 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Sep 13 02:00:02 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tajo/master/TaskSchedulerImpl.java | 3 +
.../tajo/master/querymaster/SubQuery.java | 3 +-
.../master/rm/TajoWorkerResourceManager.java | 6 +-
.../apache/tajo/master/rm/WorkerResource.java | 61 ++++++++++++++++----
.../tajo/master/rm/YarnTajoResourceManager.java | 2 +-
6 files changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 11ea618..22e2518 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -133,6 +133,8 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-168: infinite loop occurs when QueryMaster is stopping. (jinho)
+
TAJO-180: Better error messages for
StorageManager.listStatus$InvalidInputException. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 8bb17b1..71b0114 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -398,6 +398,9 @@ public class TaskSchedulerImpl extends AbstractService
LOG.info("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
"containerId=" + taskRequest.getContainerId());
ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
+
+ if(container == null) continue;
+
String host = container.getTaskHostName();
QueryUnitAttemptId attemptId = null;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 43895d8..ac92386 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -625,7 +624,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
final Resource resource = Records.newRecord(Resource.class);
- resource.setMemory(2048);
+ resource.setMemory(2000);
LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index eb710fa..7c4bf59 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -209,7 +209,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
resourceRequest.request.getDiskSlots(),
resourceRequest.request.getNumWorks());
- LOG.info("====> allocateWorkerResources: allocated:" + workerResources.size());
+ LOG.debug("====> allocateWorkerResources: allocated:" + workerResources.size());
if(workerResources.size() > 0) {
if(resourceRequest.queryMasterRequest) {
@@ -239,6 +239,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
LOG.debug("=========================================");
}
+ requestQueue.add(resourceRequest);
+ Thread.sleep(100);
}
} catch(InterruptedException ie) {
}
@@ -332,13 +334,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
queryMasterWorkerResource = queryMasterMap.remove(queryId);
}
}
- LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
WorkerResource workerResource = new WorkerResource();
workerResource.copyId(queryMasterWorkerResource);
workerResource.setMemoryMBSlots(queryMasterMemoryMB);
workerResource.setDiskSlots(queryMasterDiskSlot);
workerResource.setCpuCoreSlots(0);
releaseWorkerResource(queryId, workerResource);
+ LOG.info("released QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
}
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index b21570c..b958761 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -21,6 +21,9 @@ package org.apache.tajo.master.rm;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
public class WorkerResource {
private static final Log LOG = LogFactory.getLog(WorkerResource.class);
@@ -39,6 +42,10 @@ public class WorkerResource {
private boolean queryMasterAllocated;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock rlock = lock.readLock();
+ private final Lock wlock = lock.writeLock();
+
private WorkerStatus workerStatus;
private long lastHeartbeat;
@@ -65,7 +72,12 @@ public class WorkerResource {
}
public void addUsedMemoryMBSlots(int memoryMBSlots) {
- usedMemoryMBSlots += memoryMBSlots;
+ try {
+ wlock.lock();
+ usedMemoryMBSlots += memoryMBSlots;
+ } finally {
+ wlock.unlock();
+ }
}
public void addUsedCpuCoreSlots(int cpuCoreSlots) {
@@ -89,11 +101,21 @@ public class WorkerResource {
}
public int getMemoryMBSlots() {
- return memoryMBSlots;
+ try {
+ rlock.lock();
+ return memoryMBSlots;
+ } finally {
+ rlock.unlock();
+ }
}
public void setMemoryMBSlots(int memoryMBSlots) {
- this.memoryMBSlots = memoryMBSlots;
+ try {
+ wlock.lock();
+ this.memoryMBSlots = memoryMBSlots;
+ } finally {
+ wlock.unlock();
+ }
}
public int getAvailableDiskSlots() {
@@ -101,13 +123,13 @@ public class WorkerResource {
}
public int getAvailableMemoryMBSlots() {
- return memoryMBSlots - usedMemoryMBSlots;
+ return getMemoryMBSlots() - getUsedMemoryMBSlots();
}
@Override
public String toString() {
return "host:" + allocatedHost + ", port=" + portsToStr() + ", slots=" + memoryMBSlots + ":" + cpuCoreSlots + ":" + diskSlots +
- ", used=" + usedMemoryMBSlots + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
+ ", used=" + getUsedMemoryMBSlots() + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
}
public String portsToStr() {
@@ -119,11 +141,22 @@ public class WorkerResource {
}
public int getUsedMemoryMBSlots() {
- return usedMemoryMBSlots;
+ try {
+ rlock.lock();
+ return usedMemoryMBSlots;
+ } finally {
+ rlock.unlock();
+ }
}
public void setUsedMemoryMBSlots(int usedMemoryMBSlots) {
- this.usedMemoryMBSlots = usedMemoryMBSlots;
+ try {
+ wlock.lock();
+ this.usedMemoryMBSlots = usedMemoryMBSlots;
+ } finally {
+ wlock.unlock();
+ }
+
}
public int getUsedCpuCoreSlots() {
@@ -167,10 +200,14 @@ public class WorkerResource {
queryMasterAllocated = false;
}
- usedMemoryMBSlots = usedMemoryMBSlots - workerResource.memoryMBSlots;
- //usedDiskSlots = usedDiskSlots - workerResource.diskSlots;
+ try {
+ wlock.lock();
+ usedMemoryMBSlots = usedMemoryMBSlots - workerResource.getMemoryMBSlots();
+ } finally {
+ wlock.unlock();
+ }
- if(usedMemoryMBSlots < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
+ if(getUsedMemoryMBSlots() < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
LOG.warn("Used resources can't be a minus.");
LOG.warn(this + " ==> " + workerResource);
}
@@ -178,7 +215,7 @@ public class WorkerResource {
public int getSlots() {
//TODO what is slot? 512MB = 1slot?
- return memoryMBSlots/512;
+ return getMemoryMBSlots()/512;
}
public int getAvaliableSlots() {
@@ -188,7 +225,7 @@ public class WorkerResource {
public int getUsedSlots() {
//TODO what is slot? 512MB = 1slot?
- return usedMemoryMBSlots/512;
+ return getUsedMemoryMBSlots()/512;
}
public int getManagerPort() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index da7daf0..0baa55c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -267,7 +267,7 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
final Resource resource = Records.newRecord(Resource.class);
// TODO - get default value from conf
- resource.setMemory(2048);
+ resource.setMemory(2000);
resource.setVirtualCores(1);
Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();