You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/30 06:05:04 UTC
[02/24] tajo git commit: TAJO-1599: Implement NodeResourceManager and
Status updater. (jinho)
TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
Closes #577
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/25bd5cb4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/25bd5cb4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/25bd5cb4
Branch: refs/heads/index_support
Commit: 25bd5cb44a03ee425b02e2bc2553f7d0f8affff5
Parents: 4b2ab61
Author: Jinho Kim <jh...@apache.org>
Authored: Tue May 26 16:46:17 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue May 26 16:46:17 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 9 +-
tajo-common/src/main/proto/tajo_protos.proto | 6 +
.../tajo/master/rm/TajoResourceTracker.java | 9 +
.../resource/DefaultResourceCalculator.java | 109 ++++++++
.../org/apache/tajo/resource/NodeResource.java | 188 +++++++++++++
.../org/apache/tajo/resource/NodeResources.java | 195 +++++++++++++
.../tajo/resource/ResourceCalculator.java | 169 ++++++++++++
.../apache/tajo/worker/NodeResourceManager.java | 148 ++++++++++
.../apache/tajo/worker/NodeStatusUpdater.java | 274 +++++++++++++++++++
.../tajo/worker/WorkerHeartbeatService.java | 1 +
.../worker/event/NodeResourceAllocateEvent.java | 46 ++++
.../event/NodeResourceDeallocateEvent.java | 40 +++
.../worker/event/NodeResourceManagerEvent.java | 34 +++
.../tajo/worker/event/NodeStatusEvent.java | 40 +++
.../main/proto/ResourceTrackerProtocol.proto | 27 ++
.../src/main/proto/TajoWorkerProtocol.proto | 16 ++
.../org/apache/tajo/resource/TestResources.java | 48 ++++
.../tajo/worker/MockNodeStatusUpdater.java | 105 +++++++
.../tajo/worker/TestNodeResourceManager.java | 235 ++++++++++++++++
.../tajo/worker/TestNodeStatusUpdater.java | 115 ++++++++
.../java/org/apache/tajo/storage/DiskUtil.java | 4 +-
22 files changed, 1816 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d71d2f..c79a185 100644
--- a/CHANGES
+++ b/CHANGES
@@ -310,6 +310,8 @@ Release 0.11.0 - unreleased
SUB TASKS
+ TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
+
TAJO-1613: Rename StorageManager to Tablespace. (hyunsik)
TAJO-1359: Add nested field projector and language extension to project
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 59b1f43..e20658b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -171,9 +171,14 @@ public class TajoConf extends Configuration {
WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false, Validators.bool()),
// Tajo Worker Resources
- WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1, Validators.min("1")),
+ WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores",
+ Runtime.getRuntime().availableProcessors(), Validators.min("1")),
WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")),
+ @Deprecated
WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
+ WORKER_RESOURCE_AVAILABLE_DISKS_NUM("tajo.worker.resource.disks.num", 1, Validators.min("1")),
+ WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
+ Validators.min("1")),
WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()),
@@ -186,7 +191,7 @@ public class TajoConf extends Configuration {
WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours
QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours
- WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000), // 120 sec
+ WORKER_HEARTBEAT_INTERVAL("tajo.worker.heartbeat.interval", 10 * 1000), // 10 sec
// Resource Manager
RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager",
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index b6cd9ef..8474f54 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -62,4 +62,10 @@ message WorkerConnectionInfoProto {
optional int32 queryMasterPort = 5;
required int32 clientPort = 6;
required int32 httpInfoPort = 7;
+}
+
+message NodeResourceProto {
+ optional int32 memory = 1;
+ optional int32 virtual_cores = 2;
+ optional int32 disks = 3;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 4f3b66a..af28886 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -20,10 +20,12 @@ package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.common.exception.NotImplementedException;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
@@ -182,6 +184,13 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
}
+ @Override
+ public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request,
+ RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) {
+ //TODO implement with ResourceManager for scheduler
+ throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage()));
+ }
+
private Worker createWorkerResource(NodeHeartbeat request) {
WorkerResource workerResource = new WorkerResource();
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
new file mode 100644
index 0000000..58b8a26
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
@@ -0,0 +1,109 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tajo.resource;
+
+
+public class DefaultResourceCalculator extends ResourceCalculator {
+
+ @Override
+ public int compare(NodeResource unused, NodeResource lhs, NodeResource rhs) {
+ return lhs.compareTo(rhs);
+ }
+
+ @Override
+ public int computeAvailableContainers(NodeResource available, NodeResource required) {
+ return Math.min(Math.min(
+ available.getMemory() / required.getMemory(),
+ available.getDisks() / required.getDisks()),
+ available.getVirtualCores() / required.getVirtualCores());
+ }
+
+ @Override
+ public float divide(NodeResource unused,
+ NodeResource numerator, NodeResource denominator) {
+ return ratio(numerator, denominator);
+ }
+
+ public boolean isInvalidDivisor(NodeResource r) {
+ if (r.getMemory() == 0.0f) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public float ratio(NodeResource a, NodeResource b) {
+ return (float)a.getMemory() / b.getMemory();
+ }
+
+ @Override
+ public NodeResource divideAndCeil(NodeResource numerator, int denominator) {
+ return NodeResources.createResource(
+ divideAndCeil(numerator.getMemory(), denominator));
+ }
+
+ @Override
+ public NodeResource normalize(NodeResource r, NodeResource minimumResource,
+ NodeResource maximumResource, NodeResource stepFactor) {
+ int normalizedMemory = Math.min(
+ roundUp(
+ Math.max(r.getMemory(), minimumResource.getMemory()),
+ stepFactor.getMemory()),
+ maximumResource.getMemory());
+ return NodeResources.createResource(normalizedMemory);
+ }
+
+ @Override
+ public NodeResource normalize(NodeResource r, NodeResource minimumResource,
+ NodeResource maximumResource) {
+ return normalize(r, minimumResource, maximumResource, minimumResource);
+ }
+
+ @Override
+ public NodeResource roundUp(NodeResource r, NodeResource stepFactor) {
+ return NodeResources.createResource(
+ roundUp(r.getMemory(), stepFactor.getMemory())
+ );
+ }
+
+ @Override
+ public NodeResource roundDown(NodeResource r, NodeResource stepFactor) {
+ return NodeResources.createResource(
+ roundDown(r.getMemory(), stepFactor.getMemory()));
+ }
+
+ @Override
+ public NodeResource multiplyAndNormalizeUp(NodeResource r, double by,
+ NodeResource stepFactor) {
+ return NodeResources.createResource(
+ roundUp((int) (r.getMemory() * by + 0.5), stepFactor.getMemory())
+ );
+ }
+
+ @Override
+ public NodeResource multiplyAndNormalizeDown(NodeResource r, double by,
+ NodeResource stepFactor) {
+ return NodeResources.createResource(
+ roundDown(
+ (int) (r.getMemory() * by),
+ stepFactor.getMemory()
+ )
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
new file mode 100644
index 0000000..f51fc07
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.resource;
+
+import com.google.common.base.Objects;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * <p><code>NodeResource</code> models a set of computer resources in the
+ * cluster.</p>
+ * <p/>
+ * <p>Currently it models <em>memory</em> and <em>disk</em> and <em>CPU</em>.</p>
+ * <p/>
+ * <p>The unit for memory is megabytes. The unit for disks is the number of disk.
+ * CPU is modeled with virtual cores (vcores), a unit for expressing parallelism.
+ * A node's capacity should be configured with virtual cores equal to its number of physical cores.
+ * A task should be requested with the number of cores it can saturate.</p>
+ * <p/>
+ */
+
+public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>, Comparable<NodeResource> {
+
+ private volatile int memory;
+ private volatile int disks;
+ private volatile int vCores;
+
+ private static AtomicIntegerFieldUpdater MEMORY_UPDATER;
+ private static AtomicIntegerFieldUpdater DISKS_UPDATER;
+ private static AtomicIntegerFieldUpdater VCORES_UPDATER;
+
+ static {
+ MEMORY_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "memory");
+ if (MEMORY_UPDATER == null) {
+ MEMORY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "memory");
+ DISKS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "disks");
+ VCORES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "vCores");
+ } else {
+ DISKS_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "disks");
+ VCORES_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "vCores");
+ }
+ }
+
+ public NodeResource(TajoProtos.NodeResourceProto proto) {
+ setMemory(proto.getMemory());
+ setDisks(proto.getDisks());
+ setVirtualCores(proto.getVirtualCores());
+ }
+
+ private NodeResource() {
+
+ }
+
+ public static NodeResource createResource(int memory, int disks, int vCores) {
+ return new NodeResource().setMemory(memory).setDisks(disks).setVirtualCores(vCores);
+ }
+
+ /**
+ * Get <em>memory</em> of the resource.
+ *
+ * @return <em>memory</em> of the resource
+ */
+ public int getMemory() {
+ return memory;
+ }
+
+ /**
+ * Set <em>memory</em> of the resource.
+ *
+ * @param memory <em>memory</em> of the resource
+ */
+ @SuppressWarnings("unchecked")
+ public NodeResource setMemory(int memory) {
+ MEMORY_UPDATER.lazySet(this, memory);
+ return this;
+ }
+
+
+ /**
+ * Get <em>number of disks</em> of the resource.
+ *
+ * @return <em>number of disks</em> of the resource
+ */
+ public int getDisks() {
+ return disks;
+ }
+
+ /**
+ * Set <em>number of disks </em> of the resource.
+ *
+ * @param disks <em>number of disks</em> of the resource
+ */
+ @SuppressWarnings("unchecked")
+ public NodeResource setDisks(int disks) {
+ DISKS_UPDATER.lazySet(this, disks);
+ return this;
+ }
+
+ /**
+ * Get <em>number of virtual cpu cores</em> of the resource.
+ * Virtual cores are a unit for expressing CPU parallelism. A node's capacity
+ * should be configured with virtual cores equal to its number of physical cores.
+ *
+ * @return <em>num of virtual cpu cores</em> of the resource
+ */
+ public int getVirtualCores() {
+ return vCores;
+ }
+
+
+ /**
+ * Set <em>number of virtual cpu cores</em> of the resource.
+ *
+ * @param vCores <em>number of virtual cpu cores</em> of the resource
+ */
+ @SuppressWarnings("unchecked")
+ public NodeResource setVirtualCores(int vCores) {
+ VCORES_UPDATER.lazySet(this, vCores);
+ return this;
+ }
+
+ @Override
+ public TajoProtos.NodeResourceProto getProto() {
+ TajoProtos.NodeResourceProto.Builder builder = TajoProtos.NodeResourceProto.newBuilder();
+ builder.setMemory(memory)
+ .setDisks(disks)
+ .setVirtualCores(vCores);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getMemory(), getDisks(), getVirtualCores());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof NodeResource))
+ return false;
+ NodeResource other = (NodeResource) obj;
+ if (getMemory() != other.getMemory() ||
+ getDisks() != other.getDisks() ||
+ getVirtualCores() != other.getVirtualCores()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compareTo(NodeResource other) {
+ int diff = this.getMemory() - other.getMemory();
+ if (diff == 0) {
+ diff = this.getDisks() - other.getDisks();
+ }
+ if (diff == 0) {
+ diff = this.getVirtualCores() - other.getVirtualCores();
+ }
+ return diff;
+ }
+
+ @Override
+ public String toString() {
+ return "<memory:" + getMemory() + ", disks:" + getDisks() + ", vCores:" + getVirtualCores() + ">";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
new file mode 100644
index 0000000..01e9dcf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
@@ -0,0 +1,195 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tajo.resource;
+
+
+public class NodeResources {
+
+ public static NodeResource createResource(int memory) {
+ return createResource(memory, 0);
+ }
+
+ public static NodeResource createResource(int memory, int disks) {
+ return NodeResource.createResource(memory, disks, (memory > 0) ? 1 : 0);
+ }
+
+ public static NodeResource createResource(int memory, int disks, int vCores) {
+ return NodeResource.createResource(memory, disks, vCores);
+ }
+
+ public static NodeResource clone(NodeResource res) {
+ return NodeResource.createResource(res.getMemory(), res.getDisks(), res.getVirtualCores());
+ }
+
+ public static NodeResource update(NodeResource lhs, NodeResource rhs) {
+ return lhs.setMemory(rhs.getMemory()).setDisks(rhs.getDisks()).setVirtualCores(rhs.getVirtualCores());
+ }
+
+ public static NodeResource addTo(NodeResource lhs, NodeResource rhs) {
+ lhs.setMemory(lhs.getMemory() + rhs.getMemory())
+ .setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores())
+ .setDisks(lhs.getDisks() + rhs.getDisks());
+ return lhs;
+ }
+
+ public static NodeResource add(NodeResource lhs, NodeResource rhs) {
+ return addTo(clone(lhs), rhs);
+ }
+
+ public static NodeResource subtractFrom(NodeResource lhs, NodeResource rhs) {
+ lhs.setMemory(lhs.getMemory() - rhs.getMemory())
+ .setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores())
+ .setDisks(lhs.getDisks() - rhs.getDisks());
+ return lhs;
+ }
+
+ public static NodeResource subtract(NodeResource lhs, NodeResource rhs) {
+ return subtractFrom(clone(lhs), rhs);
+ }
+
+ public static NodeResource multiplyTo(NodeResource lhs, double by) {
+ lhs.setMemory((int) (lhs.getMemory() * by))
+ .setVirtualCores((int) (lhs.getVirtualCores() * by))
+ .setDisks((int) (lhs.getDisks() * by));
+ return lhs;
+ }
+
+ public static NodeResource multiply(NodeResource lhs, double by) {
+ return multiplyTo(clone(lhs), by);
+ }
+
+ public static NodeResource multiplyAndNormalizeUp(
+ ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) {
+ return calculator.multiplyAndNormalizeUp(lhs, by, factor);
+ }
+
+ public static NodeResource multiplyAndNormalizeDown(
+ ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) {
+ return calculator.multiplyAndNormalizeDown(lhs, by, factor);
+ }
+
+ public static NodeResource multiplyAndRoundDown(NodeResource lhs, double by) {
+ NodeResource out = clone(lhs);
+ out.setMemory((int)(lhs.getMemory() * by));
+ out.setDisks((int)(lhs.getDisks() * by));
+ out.setVirtualCores((int)(lhs.getVirtualCores() * by));
+ return out;
+ }
+
+ public static NodeResource normalize(
+ ResourceCalculator calculator, NodeResource lhs, NodeResource min,
+ NodeResource max, NodeResource increment) {
+ return calculator.normalize(lhs, min, max, increment);
+ }
+
+ public static NodeResource roundUp(
+ ResourceCalculator calculator, NodeResource lhs, NodeResource factor) {
+ return calculator.roundUp(lhs, factor);
+ }
+
+ public static NodeResource roundDown(
+ ResourceCalculator calculator, NodeResource lhs, NodeResource factor) {
+ return calculator.roundDown(lhs, factor);
+ }
+
+ public static boolean isInvalidDivisor(
+ ResourceCalculator resourceCalculator, NodeResource divisor) {
+ return resourceCalculator.isInvalidDivisor(divisor);
+ }
+
+ public static float ratio(
+ ResourceCalculator resourceCalculator, NodeResource lhs, NodeResource rhs) {
+ return resourceCalculator.ratio(lhs, rhs);
+ }
+
+ public static float divide(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource, NodeResource lhs, NodeResource rhs) {
+ return resourceCalculator.divide(clusterResource, lhs, rhs);
+ }
+
+ public static NodeResource divideAndCeil(
+ ResourceCalculator resourceCalculator, NodeResource lhs, int rhs) {
+ return resourceCalculator.divideAndCeil(lhs, rhs);
+ }
+
+ public static boolean equals(NodeResource lhs, NodeResource rhs) {
+ return lhs.equals(rhs);
+ }
+
+ public static boolean lessThan(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource,
+ NodeResource lhs, NodeResource rhs) {
+ return (resourceCalculator.compare(clusterResource, lhs, rhs) < 0);
+ }
+
+ public static boolean lessThanOrEqual(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource,
+ NodeResource lhs, NodeResource rhs) {
+ return (resourceCalculator.compare(clusterResource, lhs, rhs) <= 0);
+ }
+
+ public static boolean greaterThan(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource,
+ NodeResource lhs, NodeResource rhs) {
+ return resourceCalculator.compare(clusterResource, lhs, rhs) > 0;
+ }
+
+ public static boolean greaterThanOrEqual(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource,
+ NodeResource lhs, NodeResource rhs) {
+ return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0;
+ }
+
+ public static NodeResource min(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource,
+ NodeResource lhs, NodeResource rhs) {
+ return resourceCalculator.compare(clusterResource, lhs, rhs) <= 0 ? lhs : rhs;
+ }
+
+ public static NodeResource max(
+ ResourceCalculator resourceCalculator,
+ NodeResource clusterResource,
+ NodeResource lhs, NodeResource rhs) {
+ return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs;
+ }
+
+ public static boolean fitsIn(NodeResource smaller, NodeResource bigger) {
+ return smaller.getMemory() <= bigger.getMemory() &&
+ smaller.getDisks() <= bigger.getDisks() &&
+ smaller.getVirtualCores() <= bigger.getVirtualCores();
+ }
+
+ public static NodeResource componentwiseMin(NodeResource lhs, NodeResource rhs) {
+ return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
+ Math.min(lhs.getDisks(), rhs.getDisks()),
+ Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
+ }
+
+ public static NodeResource componentwiseMax(NodeResource lhs, NodeResource rhs) {
+ return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
+ Math.max(lhs.getDisks(), rhs.getDisks()),
+ Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java
new file mode 100644
index 0000000..b08228f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java
@@ -0,0 +1,169 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tajo.resource;
+
+
+/**
+ * A set of {@link NodeResource} comparison and manipulation interfaces.
+ */
+
+public abstract class ResourceCalculator {
+
+ public abstract int
+ compare(NodeResource clusterResource, NodeResource lhs, NodeResource rhs);
+
+ public static int divideAndCeil(int a, int b) {
+ if (b == 0) {
+ return 0;
+ }
+ return (a + (b - 1)) / b;
+ }
+
+ public static int roundUp(int a, int b) {
+ return divideAndCeil(a, b) * b;
+ }
+
+ public static int roundDown(int a, int b) {
+ return (a / b) * b;
+ }
+
+ /**
+ * Compute the number of containers which can be allocated given
+ * <code>available</code> and <code>required</code> resources.
+ *
+ * @param available available resources
+ * @param required required resources
+ * @return number of containers which can be allocated
+ */
+ public abstract int computeAvailableContainers(
+ NodeResource available, NodeResource required);
+ /**
+ * Multiply resource <code>r</code> by factor <code>by</code>
+ * and normalize up using step-factor <code>stepFactor</code>.
+ *
+ * @param r resource to be multiplied
+ * @param by multiplier
+ * @param stepFactor factor by which to normalize up
+ * @return resulting normalized resource
+ */
+ public abstract NodeResource multiplyAndNormalizeUp(
+ NodeResource r, double by, NodeResource stepFactor);
+
+ /**
+ * Multiply resource <code>r</code> by factor <code>by</code>
+ * and normalize down using step-factor <code>stepFactor</code>.
+ *
+ * @param r resource to be multiplied
+ * @param by multiplier
+ * @param stepFactor factor by which to normalize down
+ * @return resulting normalized resource
+ */
+ public abstract NodeResource multiplyAndNormalizeDown(
+ NodeResource r, double by, NodeResource stepFactor);
+
+ /**
+ * Normalize resource <code>r</code> given the base
+ * <code>minimumResource</code> and verify against max allowed
+ * <code>maximumResource</code>
+ *
+ * @param r resource
+ * @param minimumResource step-factor
+ * @param maximumResource the upper bound of the resource to be allocated
+ * @return normalized resource
+ */
+ public NodeResource normalize(NodeResource r, NodeResource minimumResource,
+ NodeResource maximumResource) {
+ return normalize(r, minimumResource, maximumResource, minimumResource);
+ }
+
+ /**
+ * Normalize resource <code>r</code> given the base
+ * <code>minimumResource</code> and verify against max allowed
+ * <code>maximumResource</code> using a step factor for hte normalization.
+ *
+ * @param r resource
+ * @param minimumResource minimum value
+ * @param maximumResource the upper bound of the resource to be allocated
+ * @param stepFactor the increment for resources to be allocated
+ * @return normalized resource
+ */
+ public abstract NodeResource normalize(NodeResource r, NodeResource minimumResource,
+ NodeResource maximumResource,
+ NodeResource stepFactor);
+
+
+ /**
+ * Round-up resource <code>r</code> given factor <code>stepFactor</code>.
+ *
+ * @param r resource
+ * @param stepFactor step-factor
+ * @return rounded resource
+ */
+ public abstract NodeResource roundUp(NodeResource r, NodeResource stepFactor);
+
+ /**
+ * Round-down resource <code>r</code> given factor <code>stepFactor</code>.
+ *
+ * @param r resource
+ * @param stepFactor step-factor
+ * @return rounded resource
+ */
+ public abstract NodeResource roundDown(NodeResource r, NodeResource stepFactor);
+
+ /**
+ * Divide resource <code>numerator</code> by resource <code>denominator</code>
+ * using specified policy (domination, average, fairness etc.); hence overall
+ * <code>clusterResource</code> is provided for context.
+ *
+ * @param clusterResource cluster resources
+ * @param numerator numerator
+ * @param denominator denominator
+ * @return <code>numerator</code>/<code>denominator</code>
+ * using specific policy
+ */
+ public abstract float divide(
+ NodeResource clusterResource, NodeResource numerator, NodeResource denominator);
+
+ /**
+ * Determine if a resource is not suitable for use as a divisor
+ * (will result in divide by 0, etc)
+ *
+ * @param r resource
+ * @return true if divisor is invalid (should not be used), false else
+ */
+ public abstract boolean isInvalidDivisor(NodeResource r);
+
+ /**
+ * Ratio of resource <code>a</code> to resource <code>b</code>.
+ *
+ * @param a resource
+ * @param b resource
+ * @return ratio of resource <code>a</code> to resource <code>b</code>
+ */
+ public abstract float ratio(NodeResource a, NodeResource b);
+
+ /**
+ * Divide-and-ceil <code>numerator</code> by <code>denominator</code>.
+ *
+ * @param numerator numerator resource
+ * @param denominator denominator
+ * @return resultant resource
+ */
+ public abstract NodeResource divideAndCeil(NodeResource numerator, int denominator);
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
new file mode 100644
index 0000000..20eec6b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.storage.DiskUtil;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.apache.tajo.worker.event.NodeResourceManagerEvent;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceManagerEvent> {
+ private static final Log LOG = LogFactory.getLog(NodeResourceManager.class);
+
+ private final Dispatcher dispatcher;
+ private NodeResource totalResource;
+ private NodeResource availableResource;
+ private AtomicInteger allocatedSize;
+ private TajoConf tajoConf;
+
+ public NodeResourceManager(Dispatcher dispatcher){
+ super(NodeResourceManager.class.getName());
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+ }
+ this.tajoConf = (TajoConf)conf;
+ this.totalResource = createWorkerResource(tajoConf);
+ this.availableResource = NodeResources.clone(totalResource);
+ this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this);
+ this.allocatedSize = new AtomicInteger();
+ super.serviceInit(conf);
+ LOG.info("Initialized NodeResourceManager for " + totalResource);
+ }
+
+ @Override
+ public void handle(NodeResourceManagerEvent event) {
+
+ if (event instanceof NodeResourceAllocateEvent) {
+ NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event;
+ BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder();
+ for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) {
+ NodeResource resource = new NodeResource(request.getResource());
+ if (allocate(resource)) {
+ allocatedSize.incrementAndGet();
+ //TODO send task event to taskExecutor
+ } else {
+ response.addCancellationTask(request);
+ }
+ }
+ allocateEvent.getCallback().run(response.build());
+
+ } else if (event instanceof NodeResourceDeallocateEvent) {
+ allocatedSize.decrementAndGet();
+ NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event;
+ release(deallocateEvent.getResource());
+
+ // send current resource to ResourceTracker
+ getDispatcher().getEventHandler().handle(
+ new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource()));
+ }
+ }
+
+ protected Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ protected NodeResource getTotalResource() {
+ return totalResource;
+ }
+
+ protected NodeResource getAvailableResource() {
+ return availableResource;
+ }
+
+ public int getAllocatedSize() {
+ return allocatedSize.get();
+ }
+
+ private boolean allocate(NodeResource resource) {
+ //TODO consider the jvm free memory
+ if (NodeResources.fitsIn(resource, availableResource)) {
+ NodeResources.subtractFrom(availableResource, resource);
+ return true;
+ }
+ return false;
+ }
+
+ private void release(NodeResource resource) {
+ NodeResources.addTo(availableResource, resource);
+ }
+
+ private NodeResource createWorkerResource(TajoConf conf) {
+ int memoryMb;
+
+ if (conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+ memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+ } else {
+ memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB),
+ conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB));
+ }
+
+ int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM);
+
+ int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize();
+ if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) {
+ disks = dataNodeStorageSize;
+ }
+
+ int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM);
+ return NodeResource.createResource(memoryMb, disks * diskParallels, vCores);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
new file mode 100644
index 0000000..84ac419
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.rpc.*;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+
+import java.net.ConnectException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*;
+
+/**
+ * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
+ */
+public class NodeStatusUpdater extends AbstractService implements EventHandler<NodeStatusEvent> {
+
+ private final static Log LOG = LogFactory.getLog(NodeStatusUpdater.class);
+
+ private TajoConf tajoConf;
+ private StatusUpdaterThread updaterThread;
+ private volatile boolean isStopped;
+ private volatile long heartBeatInterval;
+ private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue;
+ private final WorkerConnectionInfo connectionInfo;
+ private final NodeResourceManager nodeResourceManager;
+ private AsyncRpcClient rmClient;
+ private ServiceTracker serviceTracker;
+ private TajoResourceTrackerProtocolService.Interface resourceTracker;
+ private int queueingLimit;
+
+ public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) {
+ super(NodeStatusUpdater.class.getSimpleName());
+ this.connectionInfo = connectionInfo;
+ this.nodeResourceManager = resourceManager;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+ }
+ this.tajoConf = (TajoConf) conf;
+ this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue();
+ this.serviceTracker = ServiceTrackerFactory.get(tajoConf);
+ this.nodeResourceManager.getDispatcher().register(NodeStatusEvent.EventType.class, this);
+ this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL);
+ this.updaterThread = new StatusUpdaterThread();
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ // if resource changed over than 50%, send reports
+ this.queueingLimit = nodeResourceManager.getTotalResource().getVirtualCores() / 2;
+
+ updaterThread.start();
+ super.serviceStart();
+ LOG.info("NodeStatusUpdater started.");
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ this.isStopped = true;
+
+ synchronized (updaterThread) {
+ updaterThread.notifyAll();
+ }
+ super.serviceStop();
+ LOG.info("NodeStatusUpdater stopped.");
+ }
+
+ @Override
+ public void handle(NodeStatusEvent event) {
+ switch (event.getType()) {
+ case REPORT_RESOURCE:
+ heartBeatRequestQueue.add(event); //batch report to ResourceTracker
+ break;
+ case FLUSH_REPORTS:
+ heartBeatRequestQueue.add(event); //flush report to ResourceTracker
+ break;
+ }
+ }
+
+ public int getQueueSize() {
+ return heartBeatRequestQueue.size();
+ }
+
+ public int getQueueingLimit() {
+ return queueingLimit;
+ }
+
+ private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) {
+ NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
+ requestProto.setAvailableResource(resource.getProto());
+ requestProto.setWorkerId(connectionInfo.getId());
+ return requestProto.build();
+ }
+
+ private NodeHeartbeatRequestProto createHeartBeatReport() {
+ NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
+ requestProto.setWorkerId(connectionInfo.getId());
+ return requestProto.build();
+ }
+
+ private NodeHeartbeatRequestProto createNodeStatusReport() {
+ NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
+ requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto());
+ requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto());
+ requestProto.setWorkerId(connectionInfo.getId());
+ requestProto.setConnectionInfo(connectionInfo.getProto());
+
+ //TODO set node status to requestProto.setStatus()
+ return requestProto.build();
+ }
+
+ protected TajoResourceTrackerProtocolService.Interface newStub()
+ throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+ RpcClientManager.cleanup(rmClient);
+
+ RpcClientManager rpcManager = RpcClientManager.getInstance();
+ rmClient = rpcManager.newClient(serviceTracker.getResourceTrackerAddress(),
+ TajoResourceTrackerProtocol.class, true, rpcManager.getRetries(),
+ rpcManager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+ return rmClient.getStub();
+ }
+
+ protected NodeHeartbeatResponseProto sendHeartbeat(NodeHeartbeatRequestProto requestProto)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectException, ExecutionException {
+ if (resourceTracker == null) {
+ resourceTracker = newStub();
+ }
+
+ NodeHeartbeatResponseProto response = null;
+ try {
+ CallFuture<NodeHeartbeatResponseProto> callBack = new CallFuture<NodeHeartbeatResponseProto>();
+
+ resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack);
+ response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ } catch (TimeoutException te) {
+ LOG.warn("Heartbeat response is being delayed.", te);
+ } catch (ExecutionException ee) {
+ LOG.warn("TajoMaster failure: " + ee.getMessage());
+ resourceTracker = null;
+ throw ee;
+ }
+ return response;
+ }
+
+ class StatusUpdaterThread extends Thread {
+
+ public StatusUpdaterThread() {
+ super("NodeStatusUpdater");
+ }
+
+ private int drain(Collection<NodeStatusEvent> buffer, int numElements,
+ long timeout, TimeUnit unit) throws InterruptedException {
+
+ long deadline = System.nanoTime() + unit.toNanos(timeout);
+ int added = 0;
+ while (added < numElements) {
+ added += heartBeatRequestQueue.drainTo(buffer, numElements - added);
+ if (added < numElements) { // not enough elements immediately available; will have to wait
+ NodeStatusEvent e = heartBeatRequestQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
+ if (e == null) {
+ break; // we already waited enough, and there are no more elements in sight
+ }
+ buffer.add(e);
+ added++;
+
+ if (e.getType() == NodeStatusEvent.EventType.FLUSH_REPORTS) {
+ break;
+ }
+ }
+ }
+ return added;
+ }
+
+ /* Node sends a heartbeats with its resource and status periodically to master. */
+ @Override
+ public void run() {
+ NodeHeartbeatResponseProto lastResponse = null;
+ while (!isStopped && !Thread.interrupted()) {
+
+ try {
+ if (lastResponse != null) {
+ if (lastResponse.getCommand() == ResponseCommand.NORMAL) {
+ List<NodeStatusEvent> events = Lists.newArrayList();
+ try {
+ /* batch update to ResourceTracker */
+ drain(events, Math.max(queueingLimit, 1), heartBeatInterval, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (!events.isEmpty()) {
+ // send last available resource;
+ lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource()));
+ } else {
+ // send ping;
+ lastResponse = sendHeartbeat(createHeartBeatReport());
+ }
+
+ } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) {
+ // Membership changed
+ lastResponse = sendHeartbeat(createNodeStatusReport());
+ } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) {
+ //TODO abort failure queries
+ }
+ } else {
+ // Node registration on startup
+ lastResponse = sendHeartbeat(createNodeStatusReport());
+ }
+ } catch (NoSuchMethodException nsme) {
+ LOG.fatal(nsme.getMessage(), nsme);
+ Runtime.getRuntime().halt(1);
+ } catch (ClassNotFoundException cnfe) {
+ LOG.fatal(cnfe.getMessage(), cnfe);
+ Runtime.getRuntime().halt(1);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ if (!isStopped) {
+ synchronized (updaterThread) {
+ try {
+ updaterThread.wait(heartBeatInterval);
+ } catch (InterruptedException ie) {
+ // Do Nothing
+ }
+ }
+ }
+ }
+ }
+
+ LOG.info("Heartbeat Thread stopped.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index bd70d59..050e2b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -47,6 +47,7 @@ import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
/**
* It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
*/
+@Deprecated
public class WorkerHeartbeatService extends AbstractService {
/** class logger */
private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
new file mode 100644
index 0000000..2f411e8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+
+import com.google.protobuf.RpcCallback;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto;
+
+public class NodeResourceAllocateEvent extends NodeResourceManagerEvent {
+
+ private BatchAllocationRequestProto request;
+ private RpcCallback<BatchAllocationResponseProto> callback;
+
+ public NodeResourceAllocateEvent(BatchAllocationRequestProto request,
+ RpcCallback<BatchAllocationResponseProto> callback) {
+ super(EventType.ALLOCATE);
+ this.callback = callback;
+ this.request = request;
+ }
+
+ public BatchAllocationRequestProto getRequest() {
+ return request;
+ }
+
+ public RpcCallback<BatchAllocationResponseProto> getCallback() {
+ return callback;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
new file mode 100644
index 0000000..a298d77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.resource.NodeResource;
+
+public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent {
+
+ private NodeResource resource;
+
+ public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) {
+ this(new NodeResource(proto));
+ }
+
+ public NodeResourceDeallocateEvent(NodeResource resource) {
+ super(EventType.DEALLOCATE);
+ this.resource = resource;
+ }
+
+ public NodeResource getResource() {
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
new file mode 100644
index 0000000..bcb3448
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.resource.NodeResource;
+
+public class NodeResourceManagerEvent extends AbstractEvent<NodeResourceManagerEvent.EventType> {
+ public enum EventType {
+ ALLOCATE,
+ DEALLOCATE
+ }
+
+ public NodeResourceManagerEvent(EventType eventType) {
+ super(eventType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
new file mode 100644
index 0000000..58ab74a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.resource.NodeResource;
+
+public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> {
+ private final NodeResource resource;
+
+ public enum EventType {
+ REPORT_RESOURCE,
+ FLUSH_REPORTS
+ }
+
+ public NodeStatusEvent(EventType eventType, NodeResource resource) {
+ super(eventType);
+ this.resource = resource;
+ }
+
+ public NodeResource getResource() {
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index 40aeab7..dffd8c9 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -25,15 +25,42 @@ option java_generate_equals_and_hash = true;
import "QueryCoordinatorProtocol.proto";
import "ContainerProtocol.proto";
import "tajo_protos.proto";
+import "TajoIdProtos.proto";
package hadoop.yarn;
+// deprecated
message NodeHeartbeat {
required WorkerConnectionInfoProto connectionInfo = 1;
optional ServerStatusProto serverStatus = 2;
optional string statusMessage = 3;
}
+message NodeHeartbeatRequestProto {
+ required int32 workerId = 1;
+ optional NodeResourceProto totalResource = 2;
+ optional NodeResourceProto availableResource = 3;
+ optional WorkerConnectionInfoProto connectionInfo = 4;
+ optional NodeStatusProto status = 5;
+}
+
+message NodeHeartbeatResponseProto {
+ required ResponseCommand command = 1 [default = NORMAL];
+ repeated QueryIdProto queryId = 2;
+}
+
+enum ResponseCommand {
+ NORMAL = 1; //ping
+ MEMBERSHIP = 2; // request membership to worker node
+ ABORT_QUERY = 3; //query master failure
+ SHUTDOWN = 4; // black list
+}
+
+//TODO add node health information
+message NodeStatusProto {
+}
+
service TajoResourceTrackerProtocolService {
rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse);
+ rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index bf9bbde..2324596 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -116,6 +116,7 @@ message ExecutionBlockReport {
repeated IntermediateEntryProto intermediateEntries = 5;
}
+// deprecated
message TaskResponseProto {
required string id = 1;
required QueryState status = 2;
@@ -161,6 +162,7 @@ message QueryExecutionRequestProto {
optional StringProto logicalPlanJson = 6;
}
+// deprecated
message GetTaskRequestProto {
required int32 workerId = 1;
required TajoContainerIdProto containerId = 2;
@@ -198,6 +200,20 @@ message ExecutionBlockListProto {
repeated ExecutionBlockIdProto executionBlockId = 1;
}
+message TaskAllocationRequestProto {
+ required TaskRequestProto taskRequest = 1;
+ required NodeResourceProto resource = 2;
+}
+
+message BatchAllocationRequestProto {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated TaskAllocationRequestProto taskRequest = 2;
+}
+
+message BatchAllocationResponseProto {
+ repeated TaskAllocationRequestProto cancellationTask = 2;
+}
+
service TajoWorkerProtocolService {
rpc ping (TaskAttemptIdProto) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java
new file mode 100644
index 0000000..eb0d732
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.resource;
+
+import org.junit.Test;
+
+import static org.apache.tajo.resource.NodeResources.componentwiseMin;
+import static org.apache.tajo.resource.NodeResources.createResource;
+import static org.apache.tajo.resource.NodeResources.fitsIn;
+import static org.junit.Assert.*;
+
+public class TestResources {
+ @Test
+ public void testFitsIn() {
+ assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1)));
+ assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1)));
+ assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1)));
+ assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1)));
+ assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1)));
+ assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1)));
+ }
+
+ @Test
+ public void testComponentwiseMin() {
+ assertEquals(createResource(1, 1),
+ componentwiseMin(createResource(1, 1), createResource(2, 2)));
+ assertEquals(createResource(1, 1),
+ componentwiseMin(createResource(2, 2), createResource(1, 1)));
+ assertEquals(createResource(1, 1),
+ componentwiseMin(createResource(1, 2), createResource(2, 1)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
new file mode 100644
index 0000000..2d7d0be
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*;
+
+public class MockNodeStatusUpdater extends NodeStatusUpdater {
+
+ private CountDownLatch barrier;
+ private Map<Integer, NodeResource> membership = Maps.newHashMap();
+ private Map<Integer, NodeResource> resources = Maps.newHashMap();
+ private MockResourceTracker resourceTracker;
+
+ public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo,
+ NodeResourceManager resourceManager) {
+ super(connectionInfo, resourceManager);
+ this.barrier = barrier;
+ this.resourceTracker = new MockResourceTracker();
+ }
+
+ @Override
+ protected TajoResourceTrackerProtocolService.Interface newStub()
+ throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+
+ return resourceTracker;
+ }
+
+ protected MockResourceTracker getResourceTracker() {
+ return resourceTracker;
+ }
+
+ class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface {
+ private NodeHeartbeatRequestProto lastRequest;
+
+ protected Map<Integer, NodeResource> getTotalResource() {
+ return membership;
+ }
+
+ protected Map<Integer, NodeResource> getAvailableResource() {
+ return membership;
+ }
+
+ protected NodeHeartbeatRequestProto getLastRequest() {
+ return lastRequest;
+ }
+
+ @Override
+ public void heartbeat(RpcController controller, NodeHeartbeat request,
+ RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+
+ }
+
+ @Override
+ public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequestProto request,
+ RpcCallback<NodeHeartbeatResponseProto> done) {
+
+ NodeHeartbeatResponseProto.Builder response = NodeHeartbeatResponseProto.newBuilder();
+ if (membership.containsKey(request.getWorkerId())) {
+ if (request.hasAvailableResource()) {
+ NodeResource resource = resources.get(request.getWorkerId());
+ NodeResources.update(resource, new NodeResource(request.getAvailableResource()));
+ }
+ done.run(response.setCommand(ResponseCommand.NORMAL).build());
+ } else {
+ if (request.hasConnectionInfo()) {
+ membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource()));
+ resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource()));
+ done.run(response.setCommand(ResponseCommand.NORMAL).build());
+ } else {
+ done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build());
+ }
+ }
+ lastRequest = request;
+ barrier.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
new file mode 100644
index 0000000..7407acc
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.junit.*;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+public class TestNodeResourceManager {
+
+ private NodeResourceManager resourceManager;
+ private MockNodeStatusUpdater statusUpdater;
+ private AsyncDispatcher dispatcher;
+ private int taskMemory;
+ private TajoConf conf;
+
+ @Before
+ public void setup() {
+ conf = new TajoConf();
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+ taskMemory = 512;
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+ taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+
+ dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+
+ resourceManager = new NodeResourceManager(dispatcher);
+ resourceManager.init(conf);
+ resourceManager.start();
+
+ WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+ }
+
+ @After
+ public void tearDown() {
+ resourceManager.stop();
+ statusUpdater.stop();
+ dispatcher.stop();
+ }
+
+ @Test
+ public void testNodeResourceAllocateEvent() throws Exception {
+ int requestSize = 4;
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ BatchAllocationResponseProto responseProto = callFuture.get();
+ assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ assertEquals(0, responseProto.getCancellationTaskCount());
+ assertEquals(requestSize, resourceManager.getAllocatedSize());
+ }
+
+
+ @Test
+ public void testNodeResourceCancellation() throws Exception {
+ int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ int overSize = 10;
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+ BatchAllocationResponseProto responseProto = callFuture.get();
+
+ assertEquals(overSize, responseProto.getCancellationTaskCount());
+ assertEquals(requestSize, resourceManager.getAllocatedSize());
+ }
+
+ @Test
+ public void testNodeResourceDeallocateEvent() throws Exception {
+ int requestSize = 4;
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ BatchAllocationResponseProto responseProto = callFuture.get();
+ assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ assertEquals(0, responseProto.getCancellationTaskCount());
+ assertEquals(requestSize, resourceManager.getAllocatedSize());
+
+ //deallocate
+ for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) {
+ // direct invoke handler for testing
+ resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource()));
+ }
+ assertEquals(0, resourceManager.getAllocatedSize());
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ }
+
+ @Test(timeout = 30000)
+ public void testParallelRequest() throws Exception {
+ final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
+ final int taskSize = 100000;
+ final AtomicInteger totalComplete = new AtomicInteger();
+ final AtomicInteger totalCanceled = new AtomicInteger();
+
+ final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ final Queue<TaskAllocationRequestProto> totalTasks = createTaskRequests(taskMemory, taskSize);
+
+ ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+ List<Future> futureList = Lists.newArrayList();
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < parallelCount; i++) {
+ futureList.add(executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ int complete = 0;
+ while (true) {
+ TaskAllocationRequestProto task = totalTasks.poll();
+ if (task == null) break;
+
+
+ BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+ requestProto.addTaskRequest(task);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+ try {
+ BatchAllocationResponseProto proto = callFuture.get();
+ if (proto.getCancellationTaskCount() > 0) {
+ totalTasks.addAll(proto.getCancellationTaskList());
+ totalCanceled.addAndGet(proto.getCancellationTaskCount());
+ } else {
+ complete++;
+ dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource()));
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ System.out.println(Thread.currentThread().getName() + " complete requests: " + complete);
+ totalComplete.addAndGet(complete);
+ }
+ })
+ );
+ }
+
+ for (Future future : futureList) {
+ future.get();
+ }
+
+ System.out.println(parallelCount + " Thread, completed requests: " + totalComplete.get() + ", canceled requests:"
+ + totalCanceled.get() + ", " + +(System.currentTimeMillis() - startTime) + " ms elapsed");
+ executor.shutdown();
+ assertEquals(taskSize, totalComplete.get());
+ }
+
+ protected static Queue<TaskAllocationRequestProto> createTaskRequests(int memory, int size) {
+ Queue<TaskAllocationRequestProto> requestProtoList = new LinkedBlockingQueue<TaskAllocationRequestProto>();
+ for (int i = 0; i < size; i++) {
+
+ ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+ TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0);
+
+ TajoWorkerProtocol.TaskRequestProto.Builder builder =
+ TajoWorkerProtocol.TaskRequestProto.newBuilder();
+ builder.setId(taskAttemptId.getProto());
+ builder.setShouldDie(true);
+ builder.setOutputTable("");
+ builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+ builder.setClusteredOutput(false);
+
+
+ requestProtoList.add(TaskAllocationRequestProto.newBuilder()
+ .setResource(NodeResources.createResource(memory).getProto())
+ .setTaskRequest(builder.build()).build());
+ }
+ return requestProtoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
new file mode 100644
index 0000000..fb3c77e
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import static org.junit.Assert.*;
+
+public class TestNodeStatusUpdater {
+
+ private NodeResourceManager resourceManager;
+ private MockNodeStatusUpdater statusUpdater;
+ private AsyncDispatcher dispatcher;
+ private TajoConf conf;
+
+ @Before
+ public void setup() {
+ conf = new TajoConf();
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+ conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000);
+ dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+
+ resourceManager = new NodeResourceManager(dispatcher);
+ resourceManager.init(conf);
+ resourceManager.start();
+ }
+
+ @After
+ public void tearDown() {
+ resourceManager.stop();
+ if (statusUpdater != null) statusUpdater.stop();
+ dispatcher.stop();
+ }
+
+ @Test(timeout = 20000)
+ public void testNodeMembership() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(1);
+ WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+ barrier.await();
+
+ assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId()));
+ assertEquals(resourceManager.getTotalResource(),
+ resourceTracker.getTotalResource().get(worker.getId()));
+
+ assertEquals(resourceManager.getAvailableResource(),
+ resourceTracker.getAvailableResource().get(worker.getId()));
+ }
+
+ @Test(timeout = 20000)
+ public void testPing() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(2);
+ WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+ barrier.await();
+
+ TajoResourceTrackerProtocol.NodeHeartbeatRequestProto lastRequest = resourceTracker.getLastRequest();
+ assertTrue(lastRequest.hasWorkerId());
+ assertFalse(lastRequest.hasAvailableResource());
+ assertFalse(lastRequest.hasTotalResource());
+ assertFalse(lastRequest.hasConnectionInfo());
+ }
+
+ @Test(timeout = 20000)
+ public void testResourceReport() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(2);
+ WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) {
+ dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE,
+ resourceManager.getAvailableResource()));
+ }
+ barrier.await();
+ assertEquals(0, statusUpdater.getQueueSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
index 0bcd5ec..19e08e8 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
@@ -189,10 +189,10 @@ public class DiskUtil {
}
public static int getDataNodeStorageSize(){
- return getStorageDirs().size();
+ return getDataNodeStorageDirs().size();
}
- public static List<URI> getStorageDirs(){
+ public static List<URI> getDataNodeStorageDirs(){
Configuration conf = new HdfsConfiguration();
Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
return Util.stringCollectionAsURIs(dirNames);