You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/03 12:51:02 UTC

[07/18] TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
new file mode 100644
index 0000000..428bf46
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestTajoResourceManager {
+  private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+  TajoConf tajoConf;
+  TajoWorkerResourceManager tajoWorkerResourceManager;
+  long queryIdTime = System.currentTimeMillis();
+  int numWorkers = 5;
+  float workerDiskSlots = 5.0f;
+  int workerMemoryMB = 512 * 10;
+  WorkerResourceAllocationResponse response;
+
+  private void initResourceManager(boolean queryMasterMode) throws Exception {
+    tajoConf = new org.apache.tajo.conf.TajoConf();
+
+    tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
+    tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
+
+    tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
+
+    for(int i = 0; i < numWorkers; i++) {
+      ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
+          .setAvailableProcessors(1)
+          .setFreeMemoryMB(workerMemoryMB)
+          .setMaxMemoryMB(workerMemoryMB)
+          .setTotalMemoryMB(workerMemoryMB)
+          .build();
+
+      ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder()
+          .setFreeHeap(workerMemoryMB)
+          .setMaxHeap(workerMemoryMB)
+          .setTotalHeap(workerMemoryMB)
+          .build();
+
+      ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder()
+          .setAbsolutePath("/")
+          .setFreeSpace(0)
+          .setTotalSpace(0)
+          .setUsableSpace(0)
+          .build();
+
+      List<ServerStatusProto.Disk> disks = new ArrayList<ServerStatusProto.Disk>();
+
+      disks.add(disk);
+
+      ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
+          .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE)
+          .setTaskRunnerMode(BOOL_TRUE)
+          .setDiskSlots(workerDiskSlots)
+          .setMemoryResourceMB(workerMemoryMB)
+          .setJvmHeap(jvmHeap)
+          .setSystem(system)
+          .addAllDisk(disks)
+          .setRunningTaskNum(0)
+          .build();
+
+      TajoHeartbeat tajoHeartbeat = TajoHeartbeat.newBuilder()
+          .setTajoWorkerHost("host" + (i + 1))
+          .setQueryId(QueryIdFactory.newQueryId(queryIdTime, i + 1).getProto())
+          .setTajoQueryMasterPort(21000)
+          .setPeerRpcPort(29000 + i)
+          .setTajoWorkerHttpPort(28080 + i)
+          .setServerStatus(serverStatus)
+          .build();
+
+      tajoWorkerResourceManager.workerHeartbeat(tajoHeartbeat);
+    }
+  }
+
+
+  @Test
+  public void testHeartbeat() throws Exception {
+    initResourceManager(false);
+    assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
+      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
+    }
+  }
+
+  @Test
+  public void testMemoryResource() throws Exception {
+    initResourceManager(false);
+
+    final int minMemory = 256;
+    final int maxMemory = 512;
+    float diskSlots = 1.0f;
+
+    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1);
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId);
+
+    WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+        .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
+        .setNumContainers(60)
+        .setExecutionBlockId(ebId.getProto())
+        .setMaxDiskSlotPerContainer(diskSlots)
+        .setMinDiskSlotPerContainer(diskSlots)
+        .setMinMemoryMBPerContainer(minMemory)
+        .setMaxMemoryMBPerContainer(maxMemory)
+        .build();
+
+    final Object monitor = new Object();
+    final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+    RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+      @Override
+      public void run(WorkerResourceAllocationResponse response) {
+        TestTajoResourceManager.this.response = response;
+        synchronized(monitor) {
+          monitor.notifyAll();
+        }
+      }
+    };
+
+    tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+    synchronized(monitor) {
+      monitor.wait();
+    }
+
+
+    // assert after callback
+    int totalUsedMemory = 0;
+    int totalUsedDisks = 0;
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      assertEquals(0, eachWorker.getAvailableMemoryMB());
+      assertEquals(0, eachWorker.getAvailableDiskSlots(), 0);
+      assertEquals(5.0f, eachWorker.getUsedDiskSlots(), 0);
+
+      totalUsedMemory += eachWorker.getUsedMemoryMB();
+      totalUsedDisks += eachWorker.getUsedDiskSlots();
+    }
+
+    assertEquals(workerMemoryMB * numWorkers, totalUsedMemory);
+    assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
+
+    assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size());
+
+    for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
+      assertTrue(
+          eachResource.getAllocatedMemoryMB() >= minMemory &&  eachResource.getAllocatedMemoryMB() <= maxMemory);
+      containerIds.add(eachResource.getContainerId());
+    }
+
+    for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+      tajoWorkerResourceManager.releaseWorkerResource(ebId, eachContainerId);
+    }
+
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
+      assertEquals(0, eachWorker.getUsedMemoryMB());
+
+      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
+      assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
+    }
+  }
+
+  @Test
+  public void testMemoryNotCommensurable() throws Exception {
+    initResourceManager(false);
+
+    final int minMemory = 200;
+    final int maxMemory = 500;
+    float diskSlots = 1.0f;
+
+    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2);
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId);
+
+    int requiredContainers = 60;
+
+    int numAllocatedContainers = 0;
+
+    int loopCount = 0;
+    while(true) {
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+          .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
+          .setNumContainers(requiredContainers - numAllocatedContainers)
+          .setExecutionBlockId(ebId.getProto())
+          .setMaxDiskSlotPerContainer(diskSlots)
+          .setMinDiskSlotPerContainer(diskSlots)
+          .setMinMemoryMBPerContainer(minMemory)
+          .setMaxMemoryMBPerContainer(maxMemory)
+          .build();
+
+      final Object monitor = new Object();
+
+      RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+        @Override
+        public void run(WorkerResourceAllocationResponse response) {
+          TestTajoResourceManager.this.response = response;
+          synchronized(monitor) {
+            monitor.notifyAll();
+          }
+        }
+      };
+
+      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+      synchronized(monitor) {
+        monitor.wait();
+      }
+
+      numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size();
+
+      //release resource
+      for(WorkerAllocatedResource eachResource: TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) {
+        assertTrue(
+            eachResource.getAllocatedMemoryMB() >= minMemory &&  eachResource.getAllocatedMemoryMB() <= maxMemory);
+        tajoWorkerResourceManager.releaseWorkerResource(ebId, eachResource.getContainerId());
+      }
+
+      for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+        assertEquals(0, eachWorker.getUsedMemoryMB());
+        assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
+
+        assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
+        assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
+      }
+
+      loopCount++;
+
+      if(loopCount == 2) {
+        assertEquals(requiredContainers, numAllocatedContainers);
+        break;
+      }
+    }
+
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      assertEquals(0, eachWorker.getUsedMemoryMB());
+      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
+
+      assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
+      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
+    }
+  }
+
+  @Test
+  public void testDiskResource() throws Exception {
+    initResourceManager(false);
+    final float minDiskSlots = 1.0f;
+    final float maxDiskSlots = 2.0f;
+    int memoryMB = 256;
+
+    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId);
+
+    WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+        .setResourceRequestPriority(ResourceRequestPriority.DISK)
+        .setNumContainers(60)
+        .setExecutionBlockId(ebId.getProto())
+        .setMaxDiskSlotPerContainer(maxDiskSlots)
+        .setMinDiskSlotPerContainer(minDiskSlots)
+        .setMinMemoryMBPerContainer(memoryMB)
+        .setMaxMemoryMBPerContainer(memoryMB)
+        .build();
+
+    final Object monitor = new Object();
+    final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+    RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+      @Override
+      public void run(WorkerResourceAllocationResponse response) {
+        TestTajoResourceManager.this.response = response;
+        synchronized(monitor) {
+          monitor.notifyAll();
+        }
+      }
+    };
+
+    tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+    synchronized(monitor) {
+      monitor.wait();
+    }
+    for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
+      assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(),
+          eachResource.getAllocatedDiskSlots() >= minDiskSlots &&
+              eachResource.getAllocatedDiskSlots() <= maxDiskSlots);
+      containerIds.add(eachResource.getContainerId());
+    }
+
+    // assert after callback
+    int totalUsedDisks = 0;
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
+      assertEquals(0, eachWorker.getAvailableDiskSlots(), 0);
+      assertEquals(5.0f, eachWorker.getUsedDiskSlots(), 0);
+      assertEquals(256 * 3, eachWorker.getUsedMemoryMB());
+
+      totalUsedDisks += eachWorker.getUsedDiskSlots();
+    }
+
+    assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
+
+    assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
+
+    for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+      tajoWorkerResourceManager.releaseWorkerResource(ebId, eachContainerId);
+    }
+
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
+      assertEquals(0, eachWorker.getUsedMemoryMB());
+
+      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
+      assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
+    }
+  }
+
+  @Test
+  public void testQueryMasterResource() throws Exception {
+    initResourceManager(true);
+
+    int qmDefaultMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
+    float qmDefaultDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+
+    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 4);
+
+    tajoWorkerResourceManager.allocateQueryMaster(queryId);
+
+    // assert after callback
+    int totalUsedMemory = 0;
+    int totalUsedDisks = 0;
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      if(eachWorker.getUsedMemoryMB() > 0) {
+        //worker which allocated querymaster
+        assertEquals(qmDefaultMemoryMB, eachWorker.getUsedMemoryMB());
+        assertEquals(qmDefaultDiskSlots, eachWorker.getUsedDiskSlots(), 0);
+      } else {
+        assertEquals(0, eachWorker.getUsedMemoryMB());
+        assertEquals(0, eachWorker.getUsedDiskSlots(), 0);
+      }
+
+      totalUsedMemory += eachWorker.getUsedMemoryMB();
+      totalUsedDisks += eachWorker.getUsedDiskSlots();
+    }
+
+    assertEquals(qmDefaultMemoryMB, totalUsedMemory);
+    assertEquals(qmDefaultDiskSlots, totalUsedDisks, 0);
+
+    //release
+    tajoWorkerResourceManager.stopQueryMaster(queryId);
+    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
+      assertEquals(0, eachWorker.getUsedMemoryMB());
+      assertEquals(0, eachWorker.getUsedDiskSlots(), 0);
+      totalUsedMemory += eachWorker.getUsedMemoryMB();
+      totalUsedDisks += eachWorker.getUsedDiskSlots();
+    }
+  }
+}