You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/14 16:30:08 UTC

[29/51] [partial] tajo git commit: TAJO-1761: Separate an integration unit test kit into an independent module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java
new file mode 100644
index 0000000..f3b6936
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent;
+import org.apache.tajo.querymaster.Task;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestJSPUtil {
+  @Test
+  public void testSortTask() throws Exception {
+    List<Task> tasks = new ArrayList<Task>();
+
+    Configuration conf = new TajoConf();
+
+    TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext scheduleContext =
+        new TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext();
+
+    ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId("eb_000001_00001_00001");
+
+    for (int i = 0; i < 10; i++) {
+      TaskId id = new TaskId(ebId, i);
+      Task task = new Task(conf, scheduleContext, id, true, null);
+      tasks.add(task);
+
+      int launchTime = i + 1;
+      int runningTime = i + 1;
+      if(i < 9) {
+        task.setLaunchTime(launchTime);
+        task.setFinishTime(launchTime + runningTime);
+      }
+    }
+
+    Collections.shuffle(tasks);
+
+    Task[] taskArray = tasks.toArray(new Task[]{});
+    JSPUtil.sortTaskArray(taskArray, "id", "asc");
+    for (int i = 0; i < 10; i++) {
+      assertEquals(i, taskArray[i].getId().getId());
+    }
+
+    taskArray = tasks.toArray(new Task[]{});
+    JSPUtil.sortTaskArray(taskArray, "id", "desc");
+    for (int i = 0; i < 10; i++) {
+      assertEquals(9 - i, taskArray[i].getId().getId());
+    }
+
+    taskArray = tasks.toArray(new Task[]{});
+    JSPUtil.sortTaskArray(taskArray, "runTime", "asc");
+    assertEquals(0, taskArray[0].getId().getId());
+    assertEquals(9, taskArray[9].getId().getId());
+
+    taskArray = tasks.toArray(new Task[]{});
+    JSPUtil.sortTaskArray(taskArray, "runTime", "desc");
+    assertEquals(8, taskArray[0].getId().getId());
+    assertEquals(9, taskArray[9].getId().getId());
+  }
+
+  @Test
+  public void testGetPageNavigationList() {
+    List<String> originList = new ArrayList<String>();
+
+    for (int i = 0; i < 35; i++) {
+      originList.add("Data" + (i + 1));
+    }
+
+    List<String> pageList = JSPUtil.getPageNavigationList(originList, 1, 10);
+    assertEquals(10, pageList.size());
+    assertEquals("Data1", pageList.get(0));
+    assertEquals("Data10", pageList.get(9));
+
+    pageList = JSPUtil.getPageNavigationList(originList, 2, 10);
+    assertEquals(10, pageList.size());
+    assertEquals("Data11", pageList.get(0));
+    assertEquals("Data20", pageList.get(9));
+
+    pageList = JSPUtil.getPageNavigationList(originList, 3, 10);
+    assertEquals(10, pageList.size());
+    assertEquals("Data21", pageList.get(0));
+    assertEquals("Data30", pageList.get(9));
+
+    pageList = JSPUtil.getPageNavigationList(originList, 4, 10);
+    assertEquals(5, pageList.size());
+    assertEquals("Data31", pageList.get(0));
+    assertEquals("Data35", pageList.get(4));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
new file mode 100644
index 0000000..3d2578c
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.*;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.TajoIdUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestHistoryWriterReader extends QueryTestCaseBase {
+  public static final String HISTORY_DIR = "/tmp/tajo-test-history";
+  TajoConf tajoConf;
+
+  @Before
+  public void setUp() throws Exception {
+    tajoConf = new TajoConf(testingCluster.getConfiguration());
+    tajoConf.setVar(ConfVars.HISTORY_QUERY_DIR, HISTORY_DIR);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Path path = TajoConf.getQueryHistoryDir(tajoConf);
+    FileSystem fs = path.getFileSystem(tajoConf);
+    fs.delete(path, true);
+  }
+
+  @Test
+  public void testQueryInfoReadAndWrite() throws Exception {
+    HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+    try {
+      writer.init(tajoConf);
+      writer.start();
+
+      long startTime = System.currentTimeMillis();
+      QueryInfo queryInfo1 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 1));
+      queryInfo1.setStartTime(startTime);
+      queryInfo1.setProgress(1.0f);
+      queryInfo1.setQueryState(QueryState.QUERY_SUCCEEDED);
+      writer.appendHistory(queryInfo1);
+
+      QueryInfo queryInfo2 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 2));
+      queryInfo2.setStartTime(startTime);
+      queryInfo2.setProgress(0.5f);
+      queryInfo2.setQueryState(QueryState.QUERY_FAILED);
+      writer.appendAndSync(queryInfo2);
+
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+      Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+      FileSystem fs = path.getFileSystem(tajoConf);
+      Path parentPath = new Path(path, df.format(startTime) + "/query-list");
+      FileStatus[] histFiles = fs.listStatus(parentPath);
+      assertNotNull(histFiles);
+      assertEquals(1, histFiles.length);
+      assertTrue(histFiles[0].isFile());
+      assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
+
+      HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+      List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, 2);
+      assertNotNull(queryInfos);
+      assertEquals(2, queryInfos.size());
+
+      QueryInfo foundQueryInfo = queryInfos.get(0);
+      assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
+      assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
+      assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
+
+      foundQueryInfo = reader.getQueryByQueryId(queryInfo2.getQueryId());
+      assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
+      assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
+      assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
+
+      foundQueryInfo = queryInfos.get(1);
+      assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
+      assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
+      assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
+
+      foundQueryInfo = reader.getQueryByQueryId(queryInfo1.getQueryId());
+      assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
+      assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
+      assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
+    } finally {
+      writer.stop();
+    }
+  }
+
+  @Test
+  public void testQueryInfoPagination() throws Exception {
+    HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+    try {
+      writer.init(tajoConf);
+      writer.start();
+
+      long startTime = System.currentTimeMillis();
+      int testSize = 10;
+      QueryInfo queryInfo;
+
+      for (int i = 1; i < testSize + 1; i++) {
+        queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i));
+        queryInfo.setStartTime(startTime);
+        queryInfo.setProgress(1.0f);
+        queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED);
+
+        if (testSize == i) {
+          writer.appendAndSync(queryInfo);
+        } else {
+          writer.appendHistory(queryInfo);
+        }
+      }
+
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+      Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+      FileSystem fs = path.getFileSystem(tajoConf);
+      Path parentPath = new Path(path, df.format(startTime) + "/query-list");
+      FileStatus[] histFiles = fs.listStatus(parentPath);
+      assertNotNull(histFiles);
+      assertEquals(1, histFiles.length);
+      assertTrue(histFiles[0].isFile());
+      assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
+
+      HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+      List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize);
+      assertNotNull(queryInfos);
+      assertEquals(testSize, queryInfos.size());
+
+      // the pagination api returns a descending ordered list
+      for (int i = 0; i < testSize; i++) {
+        assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq());
+      }
+
+      int pages = 5;
+      int pageSize = testSize / pages;
+      int expectIdSequence = testSize;
+      //min startIndex of page is 1
+      for (int i = 1; i < pages + 1; i++) {
+        queryInfos = reader.getQueriesInHistory(i, pageSize);
+        assertNotNull(queryInfos);
+        assertEquals(pageSize, queryInfos.size());
+
+        for (QueryInfo qInfo : queryInfos) {
+          assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq());
+        }
+      }
+    } finally {
+      writer.stop();
+    }
+  }
+
+  @Test
+  public void testQueryHistoryReadAndWrite() throws Exception {
+    HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+    writer.init(tajoConf);
+    writer.start();
+
+    try {
+      long startTime = System.currentTimeMillis();
+
+      QueryHistory queryHistory = new QueryHistory();
+      QueryId queryId = QueryIdFactory.newQueryId(startTime, 1);
+      queryHistory.setQueryId(queryId.toString());
+      queryHistory.setLogicalPlan("LogicalPlan");
+      List<StageHistory> stages = new ArrayList<StageHistory>();
+      for (int i = 0; i < 3; i++) {
+        ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId, i);
+        StageHistory stageHistory = new StageHistory();
+        stageHistory.setExecutionBlockId(ebId.toString());
+        stageHistory.setStartTime(startTime + i);
+
+        List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
+        for (int j = 0; j < 5; j++) {
+          TaskHistory taskHistory = new TaskHistory();
+          taskHistory.setId(QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId), 1).toString());
+          taskHistories.add(taskHistory);
+        }
+        stageHistory.setTasks(taskHistories);
+        stages.add(stageHistory);
+      }
+      queryHistory.setStageHistories(stages);
+
+      writer.appendAndSync(queryHistory);
+
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+      Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+      FileSystem fs = path.getFileSystem(tajoConf);
+
+      assertTrue(fs.exists(new Path(path,
+          df.format(startTime) + "/query-detail/" + queryId.toString() + "/query.hist")));
+      for (int i = 0; i < 3; i++) {
+        String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString();
+        assertTrue(fs.exists(new Path(path,
+            df.format(startTime) + "/query-detail/" + queryId.toString() + "/" + ebId + ".hist")));
+      }
+
+      HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+      QueryHistory foundQueryHistory = reader.getQueryHistory(queryId.toString());
+      assertNotNull(foundQueryHistory);
+      assertEquals(queryId.toString(), foundQueryHistory.getQueryId());
+      assertEquals(3, foundQueryHistory.getStageHistories().size());
+
+      for (int i = 0; i < 3; i++) {
+        String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString();
+        StageHistory stageHistory = foundQueryHistory.getStageHistories().get(i);
+        assertEquals(ebId, stageHistory.getExecutionBlockId());
+        assertEquals(startTime + i, stageHistory.getStartTime());
+
+        // TaskHistory is stored in the other file.
+        assertNull(stageHistory.getTasks());
+
+        List<TaskHistory> tasks = reader.getTaskHistory(queryId.toString(), ebId);
+        assertNotNull(tasks);
+        assertEquals(5, tasks.size());
+
+        for (int j = 0; j < 5; j++) {
+          TaskHistory taskHistory = tasks.get(j);
+          assertEquals(stages.get(i).getTasks().get(j).getId(), taskHistory.getId());
+        }
+      }
+    } finally {
+      writer.stop();
+    }
+  }
+
+  @Test
+  public void testTaskHistoryReadAndWrite() throws Exception {
+    TajoConf tajoConf = new TajoConf();
+    File historyParentDir = Files.createTempDir();
+    historyParentDir.deleteOnExit();
+    tajoConf.setVar(ConfVars.HISTORY_TASK_DIR, "file://" + historyParentDir.getCanonicalPath());
+
+    HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", false);
+    writer.init(tajoConf);
+    writer.start();
+
+    try {
+      // Write TaskHistory
+      TableStatsProto tableStats = TableStatsProto.newBuilder()
+          .setNumRows(10)
+          .setNumBytes(100)
+          .build();
+      long startTime = System.currentTimeMillis() - 2000;
+      TaskAttemptId id1 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000001_00");
+      org.apache.tajo.worker.TaskHistory taskHistory1 = new org.apache.tajo.worker.TaskHistory(
+          id1, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis(), tableStats);
+      writer.appendHistory(taskHistory1);
+
+      TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
+      org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
+          id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
+      writer.appendAndSync(taskHistory2);
+
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
+      String startDate = df.format(new Date(startTime));
+      Path taskParentPath = new Path(tajoConf.getVar(ConfVars.HISTORY_TASK_DIR),
+          startDate.substring(0, 8) + "/tasks/127.0.0.1_28090");
+
+      FileSystem fs = taskParentPath.getFileSystem(tajoConf);
+      assertTrue(fs.exists(taskParentPath));
+
+      HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+      org.apache.tajo.worker.TaskHistory foundTaskHistory = reader.getTaskHistory(id1.toString(), startTime);
+      assertNotNull(foundTaskHistory);
+      assertEquals(id1, foundTaskHistory.getTaskAttemptId());
+      assertEquals(taskHistory1, foundTaskHistory);
+
+      foundTaskHistory = reader.getTaskHistory(id2.toString(), startTime);
+      assertNotNull(foundTaskHistory);
+      assertEquals(id2, foundTaskHistory.getTaskAttemptId());
+      assertEquals(taskHistory2, foundTaskHistory);
+
+      foundTaskHistory = reader.getTaskHistory("ta_1412326813565_0001_000001_000003_00", startTime);
+      assertNull(foundTaskHistory);
+    } finally {
+      writer.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
new file mode 100644
index 0000000..b70512c
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestMetricsFilter {
+  @Test
+  public void testGroupNameMetricsFilter() {
+    GroupNameMetricsFilter filter = new GroupNameMetricsFilter("tajomaster");
+
+    assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+    assertTrue(!filter.matches("tajomaster01.JVM.Heap.memFree", null));
+    assertTrue(!filter.matches("server.tajomaster.JVM.Heap.memFree", null));
+    assertTrue(!filter.matches("tajworker.JVM.Heap.memFree", null));
+  }
+
+  @Test
+  public void testRegexpMetricsFilter() {
+    List<String> filterExpressions = new ArrayList<String>();
+    filterExpressions.add("JVM");
+    filterExpressions.add("Query");
+
+    RegexpMetricsFilter filter = new RegexpMetricsFilter(filterExpressions);
+
+    assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+    assertTrue(filter.matches("tajomaster.Query.numQuery", null));
+
+    assertTrue(!filter.matches("tajomaster.resource.numWorker", null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
new file mode 100644
index 0000000..8751df9
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Counter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.metrics.Master;
+import org.apache.tajo.metrics.MetricsUtil;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestSystemMetrics {
+  Path testPropertyFile;
+  Path metricsOutputFile;
+  @Before
+  public void setUp() throws Exception {
+    testPropertyFile =
+        new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".properties");
+
+    metricsOutputFile =
+        new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".out");
+
+    FileOutputStream out = new FileOutputStream(testPropertyFile.toUri().getPath());
+    out.write("reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter\n".getBytes());
+    out.write("reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter\n".getBytes());
+    out.write("reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter\n".getBytes());
+
+    out.write("MASTER-JVM.reporters=console\n".getBytes());
+    out.write("MASTER.reporters=file\n".getBytes());
+    out.write("test-console-group.reporters=console\n".getBytes());
+    out.write("test-find-console-group.reporters=console,file\n".getBytes());
+
+    out.write(("MASTER.file.filename=" + metricsOutputFile.toUri().getPath() + "\n").getBytes());
+    out.write("MASTER.file.period=5\n".getBytes());
+    out.close();
+  }
+
+  @Test
+  public void testMetricsReporter() throws Exception {
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.set("tajo.metrics.property.file", testPropertyFile.toUri().getPath());
+    TajoSystemMetrics tajoSystemMetrics = new TajoSystemMetrics(tajoConf, org.apache.tajo.metrics.Master.class,
+        "localhost");
+    tajoSystemMetrics.start();
+
+    Collection<TajoMetricsScheduledReporter> reporters = tajoSystemMetrics.getMetricsReporters();
+
+    assertEquals(2, reporters.size());
+
+    TajoMetricsScheduledReporter reporter = reporters.iterator().next();
+    assertEquals(5, reporter.getPeriod());
+
+    for(int i = 0; i < 10; i++) {
+      tajoSystemMetrics.counter(Master.Query.FAILED).inc();
+      tajoSystemMetrics.counter(Master.Query.COMPLETED).inc(2);
+      tajoSystemMetrics.counter(Master.Cluster.ACTIVE_NODES).inc(3);
+    }
+
+    SortedMap<String, Counter> counterMap = tajoSystemMetrics.getRegistry().getCounters();
+    Counter counter1 = counterMap.get("MASTER.QUERY.FAILED");
+    assertNotNull(counter1);
+    assertEquals(10, counter1.getCount());
+
+    Counter counter2 = counterMap.get("MASTER.QUERY.COMPLETED");
+    assertNotNull(counter2);
+    assertEquals(20, counter2.getCount());
+
+    Counter counter3 = counterMap.get("MASTER.CLUSTER.ACTIVE_NODES");
+    assertNotNull(counter3);
+    assertEquals(30, counter3.getCount());
+
+    //test findMetricsItemGroup method
+    Map<String, Map<String, Counter>> groupItems = reporter.findMetricsItemGroup(counterMap);
+    assertEquals(2, groupItems.size());
+
+    Map<String, Counter> group01Items = groupItems.get(MetricsUtil.getCanonicalContextName(Master.Query.class));
+    assertEquals(2, group01Items.size());
+
+    counter1 = group01Items.get(Master.Query.FAILED.name());
+    assertNotNull(counter1);
+    assertEquals(10, counter1.getCount());
+
+    counter2 = group01Items.get(Master.Query.COMPLETED.name());
+    assertNotNull(counter2);
+    assertEquals(20, counter2.getCount());
+
+    Map<String, Counter> group02Items = groupItems.get(MetricsUtil.getCanonicalContextName(Master.Cluster.class));
+    assertEquals(1, group02Items.size());
+
+    reporter.report();
+
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+        new FileInputStream(metricsOutputFile.toUri().getPath())));
+
+    String line;
+
+    List<String> lines = new ArrayList<String>();
+    while((line = reader.readLine()) != null) {
+      lines.add(line);
+    }
+
+    assertEquals(2, lines.size());
+    tajoSystemMetrics.stop();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileSystem fs = testPropertyFile.getFileSystem(new Configuration());
+    fs.delete(testPropertyFile, false);
+    fs.delete(metricsOutputFile, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
new file mode 100644
index 0000000..7d7fb1a
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.TaskAttemptId;
+
+import java.io.IOException;
+
+public class MockExecutionBlock extends ExecutionBlockContext {
+
+  public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
+                            ExecutionBlockContextResponse request) throws IOException {
+    super(workerContext, request, null);
+  }
+
+  @Override
+  public void init() throws Throwable {
+    //skip
+  }
+
+  @Override
+  public void fatalError(TaskAttemptId taskAttemptId, String message) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
new file mode 100644
index 0000000..8c8427d
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.ResourceProtos.TaskAllocationProto;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+public class MockNodeResourceManager extends NodeResourceManager {
+
+  volatile boolean enableTaskHandlerEvent = true;
+  private final Semaphore barrier;
+
+  public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+    super(dispatcher, workerContext);
+    this.barrier = barrier;
+  }
+
+  @Override
+  public void handle(NodeResourceEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+
+  @Override
+  protected void startTask(TaskRequestProto request, NodeResource resource) {
+    if(enableTaskHandlerEvent) {
+      super.startTask(request, resource);
+    }
+  }
+
+  /**
+   * skip task execution and deallocation for testing
+   * */
+  public void setTaskHandlerEvent(boolean flag) {
+    enableTaskHandlerEvent = flag;
+  }
+
+  protected static Queue<TaskAllocationProto> createTaskRequests(
+      ExecutionBlockId ebId, int memory, int size) {
+
+    Queue<TaskAllocationProto>
+        requestProtoList = new LinkedBlockingQueue<TaskAllocationProto>();
+    for (int i = 0; i < size; i++) {
+
+      TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0);
+      TaskRequestProto.Builder builder = TaskRequestProto.newBuilder();
+      builder.setQueryMasterHostAndPort("localhost:0");
+      builder.setId(taskAttemptId.getProto());
+      builder.setOutputTable("");
+      builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+      builder.setClusteredOutput(false);
+
+
+      requestProtoList.add(TaskAllocationProto.newBuilder()
+          .setResource(NodeResources.createResource(memory).getProto())
+          .setTaskRequest(builder.build()).build());
+    }
+    return requestProtoList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
new file mode 100644
index 0000000..634398f
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.tajo.ResourceProtos.*;
+
+public class MockNodeStatusUpdater extends NodeStatusUpdater {
+
+  private CountDownLatch barrier;
+  private Map<Integer, NodeResource> membership = Maps.newHashMap();
+  private Map<Integer, NodeResource> resources = Maps.newHashMap();
+  private MockResourceTracker resourceTracker;
+
+  public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext) {
+    super(workerContext);
+    this.barrier = barrier;
+    this.resourceTracker = new MockResourceTracker();
+  }
+
+  @Override
+  protected TajoResourceTrackerProtocolService.Interface newStub()
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+
+    return resourceTracker;
+  }
+
+  protected MockResourceTracker getResourceTracker() {
+    return resourceTracker;
+  }
+
+  class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface {
+    private NodeHeartbeatRequest lastRequest;
+
+    protected Map<Integer, NodeResource> getTotalResource() {
+      return membership;
+    }
+
+    protected Map<Integer, NodeResource> getAvailableResource() {
+      return membership;
+    }
+
+    protected NodeHeartbeatRequest getLastRequest() {
+      return lastRequest;
+    }
+
+    @Override
+    public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequest request,
+                              RpcCallback<NodeHeartbeatResponse> done) {
+
+      NodeHeartbeatResponse.Builder response = NodeHeartbeatResponse.newBuilder();
+      if (membership.containsKey(request.getWorkerId())) {
+        if (request.hasAvailableResource()) {
+          NodeResource resource = resources.get(request.getWorkerId());
+          NodeResources.update(resource, new NodeResource(request.getAvailableResource()));
+        }
+        done.run(response.setCommand(ResponseCommand.NORMAL).build());
+      } else {
+        if (request.hasConnectionInfo()) {
+          membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource()));
+          resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource()));
+          done.run(response.setCommand(ResponseCommand.NORMAL).build());
+        } else {
+          done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build());
+        }
+      }
+      lastRequest = request;
+      barrier.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
new file mode 100644
index 0000000..071d26a
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
+import org.apache.tajo.ResourceProtos.TaskStatusProto;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.worker.event.TaskStartEvent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskExecutor extends TaskExecutor {
+
+  protected final Semaphore barrier;
+
+  public MockTaskExecutor(Semaphore barrier, TajoWorker.WorkerContext workerContext) {
+    super(workerContext);
+    this.barrier = barrier;
+  }
+
+  @Override
+  public void handle(TaskStartEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+
+  @Override
+  protected Task createTask(final ExecutionBlockContext context, TaskRequestProto taskRequest) {
+    final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+
+    //ignore status changed log
+    final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null) {
+      private TajoProtos.TaskAttemptState state;
+
+      @Override
+      public TajoProtos.TaskAttemptState getState() {
+        return state;
+      }
+
+      @Override
+      public void setState(TajoProtos.TaskAttemptState state) {
+        this.state = state;
+      }
+    };
+
+    return new Task() {
+      @Override
+      public void init() throws IOException {
+
+      }
+
+      @Override
+      public void fetch(ExecutorService executorService) {
+
+      }
+
+      @Override
+      public void run() throws Exception {
+        taskAttemptContext.stop();
+        taskAttemptContext.setProgress(1.0f);
+        taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+      }
+
+      @Override
+      public void kill() {
+
+      }
+
+      @Override
+      public void abort() {
+
+      }
+
+      @Override
+      public void cleanup() {
+
+      }
+
+      @Override
+      public boolean hasFetchPhase() {
+        return false;
+      }
+
+      @Override
+      public boolean isProgressChanged() {
+        return false;
+      }
+
+      @Override
+      public boolean isStopped() {
+        return taskAttemptContext.isStopped();
+      }
+
+      @Override
+      public void updateProgress() {
+
+      }
+
+      @Override
+      public TaskAttemptContext getTaskContext() {
+        return taskAttemptContext;
+      }
+
+      @Override
+      public ExecutionBlockContext getExecutionBlockContext() {
+        return context;
+      }
+
+      @Override
+      public TaskStatusProto getReport() {
+        TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+      builder.setWorkerName("localhost:0");
+      builder.setId(taskAttemptContext.getTaskId().getProto())
+          .setProgress(taskAttemptContext.getProgress())
+          .setState(taskAttemptContext.getState());
+
+      builder.setInputStats(new TableStats().getProto());
+      return builder.build();
+      }
+
+      @Override
+      public TaskHistory createTaskHistory() {
+        return null;
+      }
+
+      @Override
+      public List<Fetcher> getFetchers() {
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
new file mode 100644
index 0000000..76ce9f7
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.ResourceProtos.ExecutionBlockListProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.worker.event.TaskManagerEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskManager extends TaskManager {
+
+  private final Semaphore barrier;
+
+  public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+    super(dispatcher, workerContext);
+    this.barrier = barrier;
+  }
+
+  @Override
+  protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId, String queryMaster) {
+    try {
+      ExecutionBlockContextResponse.Builder builder = ExecutionBlockContextResponse.newBuilder();
+      builder.setExecutionBlockId(executionBlockId.getProto())
+          .setPlanJson("test")
+          .setQueryContext(new QueryContext(new TajoConf()).getProto())
+          .setQueryOutputPath("testpath")
+          .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+      return new MockExecutionBlock(getWorkerContext(), builder.build());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void stopExecutionBlock(ExecutionBlockContext context,
+                                    ExecutionBlockListProto cleanupList) {
+    //skip for testing
+  }
+
+  @Override
+  public void handle(TaskManagerEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
new file mode 100644
index 0000000..25f3dca
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+
+public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+  TajoSystemMetrics tajoSystemMetrics;
+
+  @Override
+  public QueryMaster getQueryMaster() {
+    return null;
+  }
+
+  public abstract TajoConf getConf();
+
+  @Override
+  public ServiceTracker getServiceTracker() {
+    return null;
+  }
+
+  @Override
+  public QueryMasterManagerService getQueryMasterManagerService() {
+    return null;
+  }
+
+  @Override
+  public CatalogService getCatalog() {
+    return null;
+  }
+
+  @Override
+  public WorkerConnectionInfo getConnectionInfo() {
+    return null;
+  }
+
+  @Override
+  public String getWorkerName() {
+    return null;
+  }
+
+  @Override
+  public LocalDirAllocator getLocalDirAllocator() {
+    return null;
+  }
+
+  @Override
+  public TajoSystemMetrics getMetrics() {
+
+    if (tajoSystemMetrics == null) {
+      tajoSystemMetrics = new TajoSystemMetrics(getConf(), org.apache.tajo.metrics.Node.class, "localhost");
+      tajoSystemMetrics.start();
+    }
+    return tajoSystemMetrics;
+  }
+
+  @Override
+  public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+    return null;
+  }
+
+  @Override
+  public HistoryWriter getTaskHistoryWriter() {
+    return null;
+  }
+
+  @Override
+  public HistoryReader getHistoryReader() {
+    return null;
+  }
+
+  @Override
+  public void cleanup(String strPath) {
+
+  }
+
+  @Override
+  public void cleanupTemporalDirectories() {
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java
new file mode 100644
index 0000000..98251c1
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestDeletionService {
+  DeletionService deletionService;
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() {
+    if(deletionService != null){
+      deletionService.stop();
+    }
+  }
+
+  @Test
+  public final void testTemporalDirectory() throws IOException, InterruptedException {
+    int delay = 1;
+    deletionService = new DeletionService(1, delay);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path tempPath = CommonTestingUtil.getTestDir();
+    assertTrue(fs.exists(tempPath));
+    deletionService.delete(tempPath);
+    assertTrue(fs.exists(tempPath));
+
+    Thread.sleep(delay * 2 * 1000);
+    assertFalse(fs.exists(tempPath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
new file mode 100644
index 0000000..a91fc30
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class TestFetcher {
+  private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
+  private String INPUT_DIR = TEST_DATA+"/in/";
+  private String OUTPUT_DIR = TEST_DATA+"/out/";
+  private TajoConf conf = new TajoConf();
+  private TajoPullServerService pullServerService;
+
+  @Before
+  public void setUp() throws Exception {
+    CommonTestingUtil.getTestDir(TEST_DATA);
+    CommonTestingUtil.getTestDir(INPUT_DIR);
+    CommonTestingUtil.getTestDir(OUTPUT_DIR);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+    pullServerService = new TajoPullServerService();
+    pullServerService.init(conf);
+    pullServerService.start();
+  }
+
+  @After
+  public void tearDown(){
+    pullServerService.stop();
+  }
+
+  @Test
+  public void testGet() throws IOException {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String partId = "1";
+
+    int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
+       queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
+
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h");
+
+    Path inputPath = new Path(dataPath);
+    FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
+    for (int i = 0; i < 100; i++) {
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+    FileChunk chunk = fetcher.get();
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
+
+    FileSystem fs = FileSystem.getLocal(new TajoConf());
+    FileStatus inStatus = fs.getFileStatus(inputPath);
+    FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
+
+    assertEquals(inStatus.getLen(), outStatus.getLen());
+    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testAdjustFetchProcess() {
+    assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+    assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+    assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+    assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+    assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+    assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
+  }
+
+  @Test
+  public void testStatus() throws Exception {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath), true);
+    for (int i = 0; i < 100; i++) {
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testNoContentFetch() throws Exception {
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+    Path inputPath = new Path(dataPath);
+    FileSystem fs = FileSystem.getLocal(conf);
+    if(fs.exists(inputPath)){
+      fs.delete(new Path(dataPath), true);
+    }
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true);
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testFailureStatus() throws Exception {
+    Random rnd = new Random();
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+
+    //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+    String shuffleType = "x";
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta);
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath), true);
+
+    for (int i = 0; i < 100; i++) {
+      String data = params + rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+
+  @Test
+  public void testServerFailure() throws Exception {
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    pullServerService.stop();
+
+    boolean failure = false;
+    try{
+      fetcher.get();
+    } catch (Throwable e){
+      failure = true;
+    }
+    assertTrue(failure);
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
new file mode 100644
index 0000000..1193478
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ResourceProtos.*;
+import static org.junit.Assert.*;
+public class TestNodeResourceManager {
+
+  private MockNodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
+  private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
+  private int taskMemory;
+  private TajoConf conf;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+    taskMemory = 512;
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+        taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
+
+    dispatcher = new AsyncDispatcher();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public TaskManager getTaskManager() {
+        return taskManager;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return taskExecutor;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return resourceManager;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext);
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), workerContext);
+    resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, workerContext);
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getMetrics().stop();
+        super.serviceStop();
+      }
+    };
+
+    service.init(conf);
+    service.start();
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test
+  public void testNodeResourceAllocateEvent() throws Exception {
+    int requestSize = 4;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
+
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    requestProto.setExecutionBlockId(ebId.getProto());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    BatchAllocationResponse responseProto = callFuture.get();
+    assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    // allocated all
+    assertEquals(0, responseProto.getCancellationTaskCount());
+  }
+
+
+  @Test
+  public void testNodeResourceCancellation() throws Exception {
+    int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+    int overSize = 10;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
+
+    CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    requestProto.setExecutionBlockId(ebId.getProto());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(
+        MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    BatchAllocationResponse responseProto = callFuture.get();
+
+    assertEquals(overSize, responseProto.getCancellationTaskCount());
+  }
+
+  @Test
+  public void testNodeResourceDeallocateEvent() throws Exception {
+    int requestSize = 4;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
+
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    requestProto.setExecutionBlockId(ebId.getProto());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    BatchAllocationResponse responseProto = callFuture.get();
+    assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    assertEquals(0, responseProto.getCancellationTaskCount());
+
+    //deallocate
+    for(TaskAllocationProto allocationRequestProto : requestProto.getTaskRequestList()) {
+      // direct invoke handler for testing
+      resourceManager.handle(new NodeResourceDeallocateEvent(
+          allocationRequestProto.getResource(), NodeResourceEvent.ResourceType.TASK));
+    }
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+  }
+
+  @Test(timeout = 30000)
+  public void testParallelRequest() throws Exception {
+    final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
+    final int taskSize = 100000;
+    resourceManager.setTaskHandlerEvent(true);
+
+    final AtomicInteger totalComplete = new AtomicInteger();
+    final AtomicInteger totalCanceled = new AtomicInteger();
+
+    final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    final Queue<TaskAllocationProto>
+        totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize);
+
+
+    TaskAllocationProto task = totalTasks.poll();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+    requestProto.addTaskRequest(task);
+    requestProto.setExecutionBlockId(ebId.getProto());
+    CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+    totalComplete.incrementAndGet();
+
+    // start parallel request
+    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+
+    List<Future> futureList = Lists.newArrayList();
+
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < parallelCount; i++) {
+      futureList.add(executor.submit(new Runnable() {
+            @Override
+            public void run() {
+              int complete = 0;
+              while (true) {
+                TaskAllocationProto task = totalTasks.poll();
+                if (task == null) break;
+
+
+                BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+                requestProto.addTaskRequest(task);
+                requestProto.setExecutionBlockId(ebId.getProto());
+
+                CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+                dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+                try {
+                  BatchAllocationResponse proto = callFuture.get();
+                  if (proto.getCancellationTaskCount() > 0) {
+                    totalTasks.addAll(proto.getCancellationTaskList());
+                    totalCanceled.addAndGet(proto.getCancellationTaskCount());
+                  } else {
+                    complete++;
+                  }
+                } catch (Exception e) {
+                  fail(e.getMessage());
+                }
+              }
+              totalComplete.addAndGet(complete);
+            }
+          })
+      );
+    }
+
+    for (Future future : futureList) {
+      future.get();
+    }
+
+    executor.shutdown();
+    assertEquals(taskSize, totalComplete.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
new file mode 100644
index 0000000..ac4b7dd
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.tajo.ResourceProtos.NodeHeartbeatRequest;
+import static org.junit.Assert.*;
+
+public class TestNodeStatusUpdater {
+
+  private NodeResourceManager resourceManager;
+  private MockNodeStatusUpdater statusUpdater;
+  private MockTaskManager taskManager;
+  private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private CompositeService service;
+  private TajoConf conf;
+  private TajoWorker.WorkerContext workerContext;
+
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public TaskManager getTaskManager() {
+        return taskManager;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return null;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return resourceManager;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+
+    conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL, 1000);
+    dispatcher = new AsyncDispatcher();
+    resourceManager = new NodeResourceManager(dispatcher, workerContext);
+    taskDispatcher = new AsyncDispatcher();
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext) {
+      @Override
+      public int getRunningTasks() {
+        return 0;
+      }
+    };
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getMetrics().stop();
+        super.serviceStop();
+      }
+    };
+
+    service.init(conf);
+    service.start();
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test(timeout = 20000)
+  public void testNodeMembership() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(1);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+    barrier.await();
+
+    assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId()));
+    assertEquals(resourceManager.getTotalResource(),
+        resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId()));
+
+    assertEquals(resourceManager.getAvailableResource(),
+        resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId()));
+  }
+
+  @Test(timeout = 20000)
+  public void testPing() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+    barrier.await();
+
+    NodeHeartbeatRequest lastRequest = resourceTracker.getLastRequest();
+    assertTrue(lastRequest.hasWorkerId());
+    assertTrue(lastRequest.hasAvailableResource());
+    assertTrue(lastRequest.hasRunningTasks());
+    assertTrue(lastRequest.hasRunningQueryMasters());
+    assertFalse(lastRequest.hasTotalResource());
+    assertFalse(lastRequest.hasConnectionInfo());
+  }
+
+  @Test(timeout = 20000)
+  public void testResourceReport() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    assertEquals(0, statusUpdater.getQueueSize());
+    for (int i = 0; i < statusUpdater.getQueueingThreshold(); i++) {
+      dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
+    }
+    barrier.await();
+    assertEquals(0, statusUpdater.getQueueSize());
+  }
+
+  @Test(timeout = 20000)
+  public void testFlushResourceReport() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    assertEquals(0, statusUpdater.getQueueSize());
+    dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+
+    barrier.await();
+    assertEquals(0, statusUpdater.getQueueSize());
+  }
+}