You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/14 16:30:08 UTC
[29/51] [partial] tajo git commit: TAJO-1761: Separate an integration
unit test kit into an independent module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java
new file mode 100644
index 0000000..f3b6936
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent;
+import org.apache.tajo.querymaster.Task;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestJSPUtil {
+ @Test
+ public void testSortTask() throws Exception {
+ List<Task> tasks = new ArrayList<Task>();
+
+ Configuration conf = new TajoConf();
+
+ TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext scheduleContext =
+ new TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext();
+
+ ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId("eb_000001_00001_00001");
+
+ for (int i = 0; i < 10; i++) {
+ TaskId id = new TaskId(ebId, i);
+ Task task = new Task(conf, scheduleContext, id, true, null);
+ tasks.add(task);
+
+ int launchTime = i + 1;
+ int runningTime = i + 1;
+ if(i < 9) {
+ task.setLaunchTime(launchTime);
+ task.setFinishTime(launchTime + runningTime);
+ }
+ }
+
+ Collections.shuffle(tasks);
+
+ Task[] taskArray = tasks.toArray(new Task[]{});
+ JSPUtil.sortTaskArray(taskArray, "id", "asc");
+ for (int i = 0; i < 10; i++) {
+ assertEquals(i, taskArray[i].getId().getId());
+ }
+
+ taskArray = tasks.toArray(new Task[]{});
+ JSPUtil.sortTaskArray(taskArray, "id", "desc");
+ for (int i = 0; i < 10; i++) {
+ assertEquals(9 - i, taskArray[i].getId().getId());
+ }
+
+ taskArray = tasks.toArray(new Task[]{});
+ JSPUtil.sortTaskArray(taskArray, "runTime", "asc");
+ assertEquals(0, taskArray[0].getId().getId());
+ assertEquals(9, taskArray[9].getId().getId());
+
+ taskArray = tasks.toArray(new Task[]{});
+ JSPUtil.sortTaskArray(taskArray, "runTime", "desc");
+ assertEquals(8, taskArray[0].getId().getId());
+ assertEquals(9, taskArray[9].getId().getId());
+ }
+
+ @Test
+ public void testGetPageNavigationList() {
+ List<String> originList = new ArrayList<String>();
+
+ for (int i = 0; i < 35; i++) {
+ originList.add("Data" + (i + 1));
+ }
+
+ List<String> pageList = JSPUtil.getPageNavigationList(originList, 1, 10);
+ assertEquals(10, pageList.size());
+ assertEquals("Data1", pageList.get(0));
+ assertEquals("Data10", pageList.get(9));
+
+ pageList = JSPUtil.getPageNavigationList(originList, 2, 10);
+ assertEquals(10, pageList.size());
+ assertEquals("Data11", pageList.get(0));
+ assertEquals("Data20", pageList.get(9));
+
+ pageList = JSPUtil.getPageNavigationList(originList, 3, 10);
+ assertEquals(10, pageList.size());
+ assertEquals("Data21", pageList.get(0));
+ assertEquals("Data30", pageList.get(9));
+
+ pageList = JSPUtil.getPageNavigationList(originList, 4, 10);
+ assertEquals(5, pageList.size());
+ assertEquals("Data31", pageList.get(0));
+ assertEquals("Data35", pageList.get(4));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
new file mode 100644
index 0000000..3d2578c
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.*;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.TajoIdUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestHistoryWriterReader extends QueryTestCaseBase {
+ public static final String HISTORY_DIR = "/tmp/tajo-test-history";
+ TajoConf tajoConf;
+
+ @Before
+ public void setUp() throws Exception {
+ tajoConf = new TajoConf(testingCluster.getConfiguration());
+ tajoConf.setVar(ConfVars.HISTORY_QUERY_DIR, HISTORY_DIR);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Path path = TajoConf.getQueryHistoryDir(tajoConf);
+ FileSystem fs = path.getFileSystem(tajoConf);
+ fs.delete(path, true);
+ }
+
+ @Test
+ public void testQueryInfoReadAndWrite() throws Exception {
+ HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+ try {
+ writer.init(tajoConf);
+ writer.start();
+
+ long startTime = System.currentTimeMillis();
+ QueryInfo queryInfo1 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 1));
+ queryInfo1.setStartTime(startTime);
+ queryInfo1.setProgress(1.0f);
+ queryInfo1.setQueryState(QueryState.QUERY_SUCCEEDED);
+ writer.appendHistory(queryInfo1);
+
+ QueryInfo queryInfo2 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 2));
+ queryInfo2.setStartTime(startTime);
+ queryInfo2.setProgress(0.5f);
+ queryInfo2.setQueryState(QueryState.QUERY_FAILED);
+ writer.appendAndSync(queryInfo2);
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+ Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+ FileSystem fs = path.getFileSystem(tajoConf);
+ Path parentPath = new Path(path, df.format(startTime) + "/query-list");
+ FileStatus[] histFiles = fs.listStatus(parentPath);
+ assertNotNull(histFiles);
+ assertEquals(1, histFiles.length);
+ assertTrue(histFiles[0].isFile());
+ assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
+
+ HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+ List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, 2);
+ assertNotNull(queryInfos);
+ assertEquals(2, queryInfos.size());
+
+ QueryInfo foundQueryInfo = queryInfos.get(0);
+ assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
+ assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
+ assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
+
+ foundQueryInfo = reader.getQueryByQueryId(queryInfo2.getQueryId());
+ assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
+ assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
+ assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
+
+ foundQueryInfo = queryInfos.get(1);
+ assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
+ assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
+ assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
+
+ foundQueryInfo = reader.getQueryByQueryId(queryInfo1.getQueryId());
+ assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
+ assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
+ assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
+ } finally {
+ writer.stop();
+ }
+ }
+
+ @Test
+ public void testQueryInfoPagination() throws Exception {
+ HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+ try {
+ writer.init(tajoConf);
+ writer.start();
+
+ long startTime = System.currentTimeMillis();
+ int testSize = 10;
+ QueryInfo queryInfo;
+
+ for (int i = 1; i < testSize + 1; i++) {
+ queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i));
+ queryInfo.setStartTime(startTime);
+ queryInfo.setProgress(1.0f);
+ queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED);
+
+ if (testSize == i) {
+ writer.appendAndSync(queryInfo);
+ } else {
+ writer.appendHistory(queryInfo);
+ }
+ }
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+ Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+ FileSystem fs = path.getFileSystem(tajoConf);
+ Path parentPath = new Path(path, df.format(startTime) + "/query-list");
+ FileStatus[] histFiles = fs.listStatus(parentPath);
+ assertNotNull(histFiles);
+ assertEquals(1, histFiles.length);
+ assertTrue(histFiles[0].isFile());
+ assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
+
+ HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+ List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize);
+ assertNotNull(queryInfos);
+ assertEquals(testSize, queryInfos.size());
+
+ // the pagination api returns a descending ordered list
+ for (int i = 0; i < testSize; i++) {
+ assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq());
+ }
+
+ int pages = 5;
+ int pageSize = testSize / pages;
+ int expectIdSequence = testSize;
+ //min startIndex of page is 1
+ for (int i = 1; i < pages + 1; i++) {
+ queryInfos = reader.getQueriesInHistory(i, pageSize);
+ assertNotNull(queryInfos);
+ assertEquals(pageSize, queryInfos.size());
+
+ for (QueryInfo qInfo : queryInfos) {
+ assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq());
+ }
+ }
+ } finally {
+ writer.stop();
+ }
+ }
+
+ @Test
+ public void testQueryHistoryReadAndWrite() throws Exception {
+ HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+ writer.init(tajoConf);
+ writer.start();
+
+ try {
+ long startTime = System.currentTimeMillis();
+
+ QueryHistory queryHistory = new QueryHistory();
+ QueryId queryId = QueryIdFactory.newQueryId(startTime, 1);
+ queryHistory.setQueryId(queryId.toString());
+ queryHistory.setLogicalPlan("LogicalPlan");
+ List<StageHistory> stages = new ArrayList<StageHistory>();
+ for (int i = 0; i < 3; i++) {
+ ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId, i);
+ StageHistory stageHistory = new StageHistory();
+ stageHistory.setExecutionBlockId(ebId.toString());
+ stageHistory.setStartTime(startTime + i);
+
+ List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
+ for (int j = 0; j < 5; j++) {
+ TaskHistory taskHistory = new TaskHistory();
+ taskHistory.setId(QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId), 1).toString());
+ taskHistories.add(taskHistory);
+ }
+ stageHistory.setTasks(taskHistories);
+ stages.add(stageHistory);
+ }
+ queryHistory.setStageHistories(stages);
+
+ writer.appendAndSync(queryHistory);
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+ Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+ FileSystem fs = path.getFileSystem(tajoConf);
+
+ assertTrue(fs.exists(new Path(path,
+ df.format(startTime) + "/query-detail/" + queryId.toString() + "/query.hist")));
+ for (int i = 0; i < 3; i++) {
+ String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString();
+ assertTrue(fs.exists(new Path(path,
+ df.format(startTime) + "/query-detail/" + queryId.toString() + "/" + ebId + ".hist")));
+ }
+
+ HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+ QueryHistory foundQueryHistory = reader.getQueryHistory(queryId.toString());
+ assertNotNull(foundQueryHistory);
+ assertEquals(queryId.toString(), foundQueryHistory.getQueryId());
+ assertEquals(3, foundQueryHistory.getStageHistories().size());
+
+ for (int i = 0; i < 3; i++) {
+ String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString();
+ StageHistory stageHistory = foundQueryHistory.getStageHistories().get(i);
+ assertEquals(ebId, stageHistory.getExecutionBlockId());
+ assertEquals(startTime + i, stageHistory.getStartTime());
+
+ // TaskHistory is stored in the other file.
+ assertNull(stageHistory.getTasks());
+
+ List<TaskHistory> tasks = reader.getTaskHistory(queryId.toString(), ebId);
+ assertNotNull(tasks);
+ assertEquals(5, tasks.size());
+
+ for (int j = 0; j < 5; j++) {
+ TaskHistory taskHistory = tasks.get(j);
+ assertEquals(stages.get(i).getTasks().get(j).getId(), taskHistory.getId());
+ }
+ }
+ } finally {
+ writer.stop();
+ }
+ }
+
+ @Test
+ public void testTaskHistoryReadAndWrite() throws Exception {
+ TajoConf tajoConf = new TajoConf();
+ File historyParentDir = Files.createTempDir();
+ historyParentDir.deleteOnExit();
+ tajoConf.setVar(ConfVars.HISTORY_TASK_DIR, "file://" + historyParentDir.getCanonicalPath());
+
+ HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", false);
+ writer.init(tajoConf);
+ writer.start();
+
+ try {
+ // Write TaskHistory
+ TableStatsProto tableStats = TableStatsProto.newBuilder()
+ .setNumRows(10)
+ .setNumBytes(100)
+ .build();
+ long startTime = System.currentTimeMillis() - 2000;
+ TaskAttemptId id1 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000001_00");
+ org.apache.tajo.worker.TaskHistory taskHistory1 = new org.apache.tajo.worker.TaskHistory(
+ id1, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis(), tableStats);
+ writer.appendHistory(taskHistory1);
+
+ TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
+ org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
+ id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
+ writer.appendAndSync(taskHistory2);
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
+ String startDate = df.format(new Date(startTime));
+ Path taskParentPath = new Path(tajoConf.getVar(ConfVars.HISTORY_TASK_DIR),
+ startDate.substring(0, 8) + "/tasks/127.0.0.1_28090");
+
+ FileSystem fs = taskParentPath.getFileSystem(tajoConf);
+ assertTrue(fs.exists(taskParentPath));
+
+ HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+ org.apache.tajo.worker.TaskHistory foundTaskHistory = reader.getTaskHistory(id1.toString(), startTime);
+ assertNotNull(foundTaskHistory);
+ assertEquals(id1, foundTaskHistory.getTaskAttemptId());
+ assertEquals(taskHistory1, foundTaskHistory);
+
+ foundTaskHistory = reader.getTaskHistory(id2.toString(), startTime);
+ assertNotNull(foundTaskHistory);
+ assertEquals(id2, foundTaskHistory.getTaskAttemptId());
+ assertEquals(taskHistory2, foundTaskHistory);
+
+ foundTaskHistory = reader.getTaskHistory("ta_1412326813565_0001_000001_000003_00", startTime);
+ assertNull(foundTaskHistory);
+ } finally {
+ writer.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
new file mode 100644
index 0000000..b70512c
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestMetricsFilter {
+ @Test
+ public void testGroupNameMetricsFilter() {
+ GroupNameMetricsFilter filter = new GroupNameMetricsFilter("tajomaster");
+
+ assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+ assertTrue(!filter.matches("tajomaster01.JVM.Heap.memFree", null));
+ assertTrue(!filter.matches("server.tajomaster.JVM.Heap.memFree", null));
+ assertTrue(!filter.matches("tajworker.JVM.Heap.memFree", null));
+ }
+
+ @Test
+ public void testRegexpMetricsFilter() {
+ List<String> filterExpressions = new ArrayList<String>();
+ filterExpressions.add("JVM");
+ filterExpressions.add("Query");
+
+ RegexpMetricsFilter filter = new RegexpMetricsFilter(filterExpressions);
+
+ assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+ assertTrue(filter.matches("tajomaster.Query.numQuery", null));
+
+ assertTrue(!filter.matches("tajomaster.resource.numWorker", null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
new file mode 100644
index 0000000..8751df9
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Counter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.metrics.Master;
+import org.apache.tajo.metrics.MetricsUtil;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestSystemMetrics {
+ Path testPropertyFile;
+ Path metricsOutputFile;
+ @Before
+ public void setUp() throws Exception {
+ testPropertyFile =
+ new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".properties");
+
+ metricsOutputFile =
+ new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".out");
+
+ FileOutputStream out = new FileOutputStream(testPropertyFile.toUri().getPath());
+ out.write("reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter\n".getBytes());
+ out.write("reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter\n".getBytes());
+ out.write("reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter\n".getBytes());
+
+ out.write("MASTER-JVM.reporters=console\n".getBytes());
+ out.write("MASTER.reporters=file\n".getBytes());
+ out.write("test-console-group.reporters=console\n".getBytes());
+ out.write("test-find-console-group.reporters=console,file\n".getBytes());
+
+ out.write(("MASTER.file.filename=" + metricsOutputFile.toUri().getPath() + "\n").getBytes());
+ out.write("MASTER.file.period=5\n".getBytes());
+ out.close();
+ }
+
+ @Test
+ public void testMetricsReporter() throws Exception {
+ TajoConf tajoConf = new TajoConf();
+ tajoConf.set("tajo.metrics.property.file", testPropertyFile.toUri().getPath());
+ TajoSystemMetrics tajoSystemMetrics = new TajoSystemMetrics(tajoConf, org.apache.tajo.metrics.Master.class,
+ "localhost");
+ tajoSystemMetrics.start();
+
+ Collection<TajoMetricsScheduledReporter> reporters = tajoSystemMetrics.getMetricsReporters();
+
+ assertEquals(2, reporters.size());
+
+ TajoMetricsScheduledReporter reporter = reporters.iterator().next();
+ assertEquals(5, reporter.getPeriod());
+
+ for(int i = 0; i < 10; i++) {
+ tajoSystemMetrics.counter(Master.Query.FAILED).inc();
+ tajoSystemMetrics.counter(Master.Query.COMPLETED).inc(2);
+ tajoSystemMetrics.counter(Master.Cluster.ACTIVE_NODES).inc(3);
+ }
+
+ SortedMap<String, Counter> counterMap = tajoSystemMetrics.getRegistry().getCounters();
+ Counter counter1 = counterMap.get("MASTER.QUERY.FAILED");
+ assertNotNull(counter1);
+ assertEquals(10, counter1.getCount());
+
+ Counter counter2 = counterMap.get("MASTER.QUERY.COMPLETED");
+ assertNotNull(counter2);
+ assertEquals(20, counter2.getCount());
+
+ Counter counter3 = counterMap.get("MASTER.CLUSTER.ACTIVE_NODES");
+ assertNotNull(counter3);
+ assertEquals(30, counter3.getCount());
+
+ //test findMetricsItemGroup method
+ Map<String, Map<String, Counter>> groupItems = reporter.findMetricsItemGroup(counterMap);
+ assertEquals(2, groupItems.size());
+
+ Map<String, Counter> group01Items = groupItems.get(MetricsUtil.getCanonicalContextName(Master.Query.class));
+ assertEquals(2, group01Items.size());
+
+ counter1 = group01Items.get(Master.Query.FAILED.name());
+ assertNotNull(counter1);
+ assertEquals(10, counter1.getCount());
+
+ counter2 = group01Items.get(Master.Query.COMPLETED.name());
+ assertNotNull(counter2);
+ assertEquals(20, counter2.getCount());
+
+ Map<String, Counter> group02Items = groupItems.get(MetricsUtil.getCanonicalContextName(Master.Cluster.class));
+ assertEquals(1, group02Items.size());
+
+ reporter.report();
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(metricsOutputFile.toUri().getPath())));
+
+ String line;
+
+ List<String> lines = new ArrayList<String>();
+ while((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+
+ assertEquals(2, lines.size());
+ tajoSystemMetrics.stop();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileSystem fs = testPropertyFile.getFileSystem(new Configuration());
+ fs.delete(testPropertyFile, false);
+ fs.delete(metricsOutputFile, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
new file mode 100644
index 0000000..7d7fb1a
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.TaskAttemptId;
+
+import java.io.IOException;
+
+public class MockExecutionBlock extends ExecutionBlockContext {
+
+ public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
+ ExecutionBlockContextResponse request) throws IOException {
+ super(workerContext, request, null);
+ }
+
+ @Override
+ public void init() throws Throwable {
+ //skip
+ }
+
+ @Override
+ public void fatalError(TaskAttemptId taskAttemptId, String message) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
new file mode 100644
index 0000000..8c8427d
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.ResourceProtos.TaskAllocationProto;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+public class MockNodeResourceManager extends NodeResourceManager {
+
+ volatile boolean enableTaskHandlerEvent = true;
+ private final Semaphore barrier;
+
+ public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+ super(dispatcher, workerContext);
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void handle(NodeResourceEvent event) {
+ super.handle(event);
+ barrier.release();
+ }
+
+ @Override
+ protected void startTask(TaskRequestProto request, NodeResource resource) {
+ if(enableTaskHandlerEvent) {
+ super.startTask(request, resource);
+ }
+ }
+
+ /**
+ * skip task execution and deallocation for testing
+ * */
+ public void setTaskHandlerEvent(boolean flag) {
+ enableTaskHandlerEvent = flag;
+ }
+
+ protected static Queue<TaskAllocationProto> createTaskRequests(
+ ExecutionBlockId ebId, int memory, int size) {
+
+ Queue<TaskAllocationProto>
+ requestProtoList = new LinkedBlockingQueue<TaskAllocationProto>();
+ for (int i = 0; i < size; i++) {
+
+ TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0);
+ TaskRequestProto.Builder builder = TaskRequestProto.newBuilder();
+ builder.setQueryMasterHostAndPort("localhost:0");
+ builder.setId(taskAttemptId.getProto());
+ builder.setOutputTable("");
+ builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+ builder.setClusteredOutput(false);
+
+
+ requestProtoList.add(TaskAllocationProto.newBuilder()
+ .setResource(NodeResources.createResource(memory).getProto())
+ .setTaskRequest(builder.build()).build());
+ }
+ return requestProtoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
new file mode 100644
index 0000000..634398f
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.tajo.ResourceProtos.*;
+
+public class MockNodeStatusUpdater extends NodeStatusUpdater {
+
+ private CountDownLatch barrier;
+ private Map<Integer, NodeResource> membership = Maps.newHashMap();
+ private Map<Integer, NodeResource> resources = Maps.newHashMap();
+ private MockResourceTracker resourceTracker;
+
+ public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext) {
+ super(workerContext);
+ this.barrier = barrier;
+ this.resourceTracker = new MockResourceTracker();
+ }
+
+ @Override
+ protected TajoResourceTrackerProtocolService.Interface newStub()
+ throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+
+ return resourceTracker;
+ }
+
+ protected MockResourceTracker getResourceTracker() {
+ return resourceTracker;
+ }
+
+ class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface {
+ private NodeHeartbeatRequest lastRequest;
+
+ protected Map<Integer, NodeResource> getTotalResource() {
+ return membership;
+ }
+
+ protected Map<Integer, NodeResource> getAvailableResource() {
+ return membership;
+ }
+
+ protected NodeHeartbeatRequest getLastRequest() {
+ return lastRequest;
+ }
+
+ @Override
+ public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequest request,
+ RpcCallback<NodeHeartbeatResponse> done) {
+
+ NodeHeartbeatResponse.Builder response = NodeHeartbeatResponse.newBuilder();
+ if (membership.containsKey(request.getWorkerId())) {
+ if (request.hasAvailableResource()) {
+ NodeResource resource = resources.get(request.getWorkerId());
+ NodeResources.update(resource, new NodeResource(request.getAvailableResource()));
+ }
+ done.run(response.setCommand(ResponseCommand.NORMAL).build());
+ } else {
+ if (request.hasConnectionInfo()) {
+ membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource()));
+ resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource()));
+ done.run(response.setCommand(ResponseCommand.NORMAL).build());
+ } else {
+ done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build());
+ }
+ }
+ lastRequest = request;
+ barrier.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
new file mode 100644
index 0000000..071d26a
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
+import org.apache.tajo.ResourceProtos.TaskStatusProto;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.worker.event.TaskStartEvent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskExecutor extends TaskExecutor {
+
+ protected final Semaphore barrier;
+
+ public MockTaskExecutor(Semaphore barrier, TajoWorker.WorkerContext workerContext) {
+ super(workerContext);
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void handle(TaskStartEvent event) {
+ super.handle(event);
+ barrier.release();
+ }
+
+ @Override
+ protected Task createTask(final ExecutionBlockContext context, TaskRequestProto taskRequest) {
+ final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+
+ //ignore status changed log
+ final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null) {
+ private TajoProtos.TaskAttemptState state;
+
+ @Override
+ public TajoProtos.TaskAttemptState getState() {
+ return state;
+ }
+
+ @Override
+ public void setState(TajoProtos.TaskAttemptState state) {
+ this.state = state;
+ }
+ };
+
+ return new Task() {
+ @Override
+ public void init() throws IOException {
+
+ }
+
+ @Override
+ public void fetch(ExecutorService executorService) {
+
+ }
+
+ @Override
+ public void run() throws Exception {
+ taskAttemptContext.stop();
+ taskAttemptContext.setProgress(1.0f);
+ taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+ }
+
+ @Override
+ public void kill() {
+
+ }
+
+ @Override
+ public void abort() {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public boolean hasFetchPhase() {
+ return false;
+ }
+
+ @Override
+ public boolean isProgressChanged() {
+ return false;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return taskAttemptContext.isStopped();
+ }
+
+ @Override
+ public void updateProgress() {
+
+ }
+
+ @Override
+ public TaskAttemptContext getTaskContext() {
+ return taskAttemptContext;
+ }
+
+ @Override
+ public ExecutionBlockContext getExecutionBlockContext() {
+ return context;
+ }
+
+ @Override
+ public TaskStatusProto getReport() {
+ TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+ builder.setWorkerName("localhost:0");
+ builder.setId(taskAttemptContext.getTaskId().getProto())
+ .setProgress(taskAttemptContext.getProgress())
+ .setState(taskAttemptContext.getState());
+
+ builder.setInputStats(new TableStats().getProto());
+ return builder.build();
+ }
+
+ @Override
+ public TaskHistory createTaskHistory() {
+ return null;
+ }
+
+ @Override
+ public List<Fetcher> getFetchers() {
+ return null;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
new file mode 100644
index 0000000..76ce9f7
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.ResourceProtos.ExecutionBlockListProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.worker.event.TaskManagerEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskManager extends TaskManager {
+
+ private final Semaphore barrier;
+
+ public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+ super(dispatcher, workerContext);
+ this.barrier = barrier;
+ }
+
+ @Override
+ protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId, String queryMaster) {
+ try {
+ ExecutionBlockContextResponse.Builder builder = ExecutionBlockContextResponse.newBuilder();
+ builder.setExecutionBlockId(executionBlockId.getProto())
+ .setPlanJson("test")
+ .setQueryContext(new QueryContext(new TajoConf()).getProto())
+ .setQueryOutputPath("testpath")
+ .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+ return new MockExecutionBlock(getWorkerContext(), builder.build());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void stopExecutionBlock(ExecutionBlockContext context,
+ ExecutionBlockListProto cleanupList) {
+ //skip for testing
+ }
+
+ @Override
+ public void handle(TaskManagerEvent event) {
+ super.handle(event);
+ barrier.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
new file mode 100644
index 0000000..25f3dca
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+
+public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+ TajoSystemMetrics tajoSystemMetrics;
+
+ @Override
+ public QueryMaster getQueryMaster() {
+ return null;
+ }
+
+ public abstract TajoConf getConf();
+
+ @Override
+ public ServiceTracker getServiceTracker() {
+ return null;
+ }
+
+ @Override
+ public QueryMasterManagerService getQueryMasterManagerService() {
+ return null;
+ }
+
+ @Override
+ public CatalogService getCatalog() {
+ return null;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ return null;
+ }
+
+ @Override
+ public String getWorkerName() {
+ return null;
+ }
+
+ @Override
+ public LocalDirAllocator getLocalDirAllocator() {
+ return null;
+ }
+
+ @Override
+ public TajoSystemMetrics getMetrics() {
+
+ if (tajoSystemMetrics == null) {
+ tajoSystemMetrics = new TajoSystemMetrics(getConf(), org.apache.tajo.metrics.Node.class, "localhost");
+ tajoSystemMetrics.start();
+ }
+ return tajoSystemMetrics;
+ }
+
+ @Override
+ public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+ return null;
+ }
+
+ @Override
+ public HistoryWriter getTaskHistoryWriter() {
+ return null;
+ }
+
+ @Override
+ public HistoryReader getHistoryReader() {
+ return null;
+ }
+
+ @Override
+ public void cleanup(String strPath) {
+
+ }
+
+ @Override
+ public void cleanupTemporalDirectories() {
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java
new file mode 100644
index 0000000..98251c1
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestDeletionService {
+ DeletionService deletionService;
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() {
+ if(deletionService != null){
+ deletionService.stop();
+ }
+ }
+
+ @Test
+ public final void testTemporalDirectory() throws IOException, InterruptedException {
+ int delay = 1;
+ deletionService = new DeletionService(1, delay);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path tempPath = CommonTestingUtil.getTestDir();
+ assertTrue(fs.exists(tempPath));
+ deletionService.delete(tempPath);
+ assertTrue(fs.exists(tempPath));
+
+ Thread.sleep(delay * 2 * 1000);
+ assertFalse(fs.exists(tempPath));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
new file mode 100644
index 0000000..a91fc30
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class TestFetcher {
+ private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
+ private String INPUT_DIR = TEST_DATA+"/in/";
+ private String OUTPUT_DIR = TEST_DATA+"/out/";
+ private TajoConf conf = new TajoConf();
+ private TajoPullServerService pullServerService;
+
+ @Before
+ public void setUp() throws Exception {
+ CommonTestingUtil.getTestDir(TEST_DATA);
+ CommonTestingUtil.getTestDir(INPUT_DIR);
+ CommonTestingUtil.getTestDir(OUTPUT_DIR);
+ conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+ pullServerService = new TajoPullServerService();
+ pullServerService.init(conf);
+ pullServerService.start();
+ }
+
+ @After
+ public void tearDown(){
+ pullServerService.stop();
+ }
+
+ @Test
+ public void testGet() throws IOException {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String partId = "1";
+
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
+ queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
+
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h");
+
+ Path inputPath = new Path(dataPath);
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
+ for (int i = 0; i < 100; i++) {
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+ FileChunk chunk = fetcher.get();
+ assertNotNull(chunk);
+ assertNotNull(chunk.getFile());
+
+ FileSystem fs = FileSystem.getLocal(new TajoConf());
+ FileStatus inStatus = fs.getFileStatus(inputPath);
+ FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
+
+ assertEquals(inStatus.getLen(), outStatus.getLen());
+ assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testAdjustFetchProcess() {
+ assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+ assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+ assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+ assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+ assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+ assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+ assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
+ }
+
+ @Test
+ public void testStatus() throws Exception {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true);
+ for (int i = 0; i < 100; i++) {
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testNoContentFetch() throws Exception {
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+ Path inputPath = new Path(dataPath);
+ FileSystem fs = FileSystem.getLocal(conf);
+ if(fs.exists(inputPath)){
+ fs.delete(new Path(dataPath), true);
+ }
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true);
+ stream.close();
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testFailureStatus() throws Exception {
+ Random rnd = new Random();
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+
+ //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+ String shuffleType = "x";
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta);
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true);
+
+ for (int i = 0; i < 100; i++) {
+ String data = params + rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+
+ @Test
+ public void testServerFailure() throws Exception {
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ pullServerService.stop();
+
+ boolean failure = false;
+ try{
+ fetcher.get();
+ } catch (Throwable e){
+ failure = true;
+ }
+ assertTrue(failure);
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
new file mode 100644
index 0000000..1193478
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ResourceProtos.*;
+import static org.junit.Assert.*;
+public class TestNodeResourceManager {
+
+ private MockNodeResourceManager resourceManager;
+ private NodeStatusUpdater statusUpdater;
+ private TaskManager taskManager;
+ private TaskExecutor taskExecutor;
+ private AsyncDispatcher dispatcher;
+ private AsyncDispatcher taskDispatcher;
+ private TajoWorker.WorkerContext workerContext;
+
+ private CompositeService service;
+ private int taskMemory;
+ private TajoConf conf;
+
+ @Before
+ public void setup() {
+ conf = new TajoConf();
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+ taskMemory = 512;
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+ taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
+
+ dispatcher = new AsyncDispatcher();
+ taskDispatcher = new AsyncDispatcher();
+
+ workerContext = new MockWorkerContext() {
+ WorkerConnectionInfo workerConnectionInfo;
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public TaskManager getTaskManager() {
+ return taskManager;
+ }
+
+ @Override
+ public TaskExecutor getTaskExecuor() {
+ return taskExecutor;
+ }
+
+ @Override
+ public NodeResourceManager getNodeResourceManager() {
+ return resourceManager;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ if (workerConnectionInfo == null) {
+ workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ }
+ return workerConnectionInfo;
+ }
+ };
+
+ taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext);
+ taskExecutor = new MockTaskExecutor(new Semaphore(0), workerContext);
+ resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, workerContext);
+ statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext);
+
+ service = new CompositeService("MockService") {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addIfService(dispatcher);
+ addIfService(taskDispatcher);
+ addIfService(taskManager);
+ addIfService(taskExecutor);
+ addIfService(resourceManager);
+ addIfService(statusUpdater);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ workerContext.getMetrics().stop();
+ super.serviceStop();
+ }
+ };
+
+ service.init(conf);
+ service.start();
+ }
+
+ @After
+ public void tearDown() {
+ service.stop();
+ }
+
+ @Test
+ public void testNodeResourceAllocateEvent() throws Exception {
+ int requestSize = 4;
+ resourceManager.setTaskHandlerEvent(false); //skip task execution
+
+ CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+ BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ BatchAllocationResponse responseProto = callFuture.get();
+ assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ // allocated all
+ assertEquals(0, responseProto.getCancellationTaskCount());
+ }
+
+
+ @Test
+ public void testNodeResourceCancellation() throws Exception {
+ int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ int overSize = 10;
+ resourceManager.setTaskHandlerEvent(false); //skip task execution
+
+ CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+ BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(
+ MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+ BatchAllocationResponse responseProto = callFuture.get();
+
+ assertEquals(overSize, responseProto.getCancellationTaskCount());
+ }
+
+ @Test
+ public void testNodeResourceDeallocateEvent() throws Exception {
+ int requestSize = 4;
+ resourceManager.setTaskHandlerEvent(false); //skip task execution
+
+ CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+ BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
+
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+ BatchAllocationResponse responseProto = callFuture.get();
+ assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ assertEquals(0, responseProto.getCancellationTaskCount());
+
+ //deallocate
+ for(TaskAllocationProto allocationRequestProto : requestProto.getTaskRequestList()) {
+ // direct invoke handler for testing
+ resourceManager.handle(new NodeResourceDeallocateEvent(
+ allocationRequestProto.getResource(), NodeResourceEvent.ResourceType.TASK));
+ }
+
+ assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+ }
+
+ @Test(timeout = 30000)
+ public void testParallelRequest() throws Exception {
+ final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
+ final int taskSize = 100000;
+ resourceManager.setTaskHandlerEvent(true);
+
+ final AtomicInteger totalComplete = new AtomicInteger();
+ final AtomicInteger totalCanceled = new AtomicInteger();
+
+ final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ final Queue<TaskAllocationProto>
+ totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize);
+
+
+ TaskAllocationProto task = totalTasks.poll();
+ BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+ requestProto.addTaskRequest(task);
+ requestProto.setExecutionBlockId(ebId.getProto());
+ CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+ assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+ totalComplete.incrementAndGet();
+
+ // start parallel request
+ ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+
+ List<Future> futureList = Lists.newArrayList();
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < parallelCount; i++) {
+ futureList.add(executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ int complete = 0;
+ while (true) {
+ TaskAllocationProto task = totalTasks.poll();
+ if (task == null) break;
+
+
+ BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
+ requestProto.addTaskRequest(task);
+ requestProto.setExecutionBlockId(ebId.getProto());
+
+ CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+ dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+ try {
+ BatchAllocationResponse proto = callFuture.get();
+ if (proto.getCancellationTaskCount() > 0) {
+ totalTasks.addAll(proto.getCancellationTaskList());
+ totalCanceled.addAndGet(proto.getCancellationTaskCount());
+ } else {
+ complete++;
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ totalComplete.addAndGet(complete);
+ }
+ })
+ );
+ }
+
+ for (Future future : futureList) {
+ future.get();
+ }
+
+ executor.shutdown();
+ assertEquals(taskSize, totalComplete.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
new file mode 100644
index 0000000..ac4b7dd
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.tajo.ResourceProtos.NodeHeartbeatRequest;
+import static org.junit.Assert.*;
+
+public class TestNodeStatusUpdater {
+
+ private NodeResourceManager resourceManager;
+ private MockNodeStatusUpdater statusUpdater;
+ private MockTaskManager taskManager;
+ private AsyncDispatcher dispatcher;
+ private AsyncDispatcher taskDispatcher;
+ private CompositeService service;
+ private TajoConf conf;
+ private TajoWorker.WorkerContext workerContext;
+
+
+ @Before
+ public void setup() {
+ conf = new TajoConf();
+ conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+ conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
+
+ workerContext = new MockWorkerContext() {
+ WorkerConnectionInfo workerConnectionInfo;
+
+ @Override
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public TaskManager getTaskManager() {
+ return taskManager;
+ }
+
+ @Override
+ public TaskExecutor getTaskExecuor() {
+ return null;
+ }
+
+ @Override
+ public NodeResourceManager getNodeResourceManager() {
+ return resourceManager;
+ }
+
+ @Override
+ public WorkerConnectionInfo getConnectionInfo() {
+ if (workerConnectionInfo == null) {
+ workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ }
+ return workerConnectionInfo;
+ }
+ };
+
+ conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL, 1000);
+ dispatcher = new AsyncDispatcher();
+ resourceManager = new NodeResourceManager(dispatcher, workerContext);
+ taskDispatcher = new AsyncDispatcher();
+ taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext) {
+ @Override
+ public int getRunningTasks() {
+ return 0;
+ }
+ };
+
+ service = new CompositeService("MockService") {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addIfService(dispatcher);
+ addIfService(taskDispatcher);
+ addIfService(taskManager);
+ addIfService(resourceManager);
+ addIfService(statusUpdater);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ workerContext.getMetrics().stop();
+ super.serviceStop();
+ }
+ };
+
+ service.init(conf);
+ service.start();
+ }
+
+ @After
+ public void tearDown() {
+ service.stop();
+ }
+
+ @Test(timeout = 20000)
+ public void testNodeMembership() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(1);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+ barrier.await();
+
+ assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId()));
+ assertEquals(resourceManager.getTotalResource(),
+ resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId()));
+
+ assertEquals(resourceManager.getAvailableResource(),
+ resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId()));
+ }
+
+ @Test(timeout = 20000)
+ public void testPing() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(2);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+ barrier.await();
+
+ NodeHeartbeatRequest lastRequest = resourceTracker.getLastRequest();
+ assertTrue(lastRequest.hasWorkerId());
+ assertTrue(lastRequest.hasAvailableResource());
+ assertTrue(lastRequest.hasRunningTasks());
+ assertTrue(lastRequest.hasRunningQueryMasters());
+ assertFalse(lastRequest.hasTotalResource());
+ assertFalse(lastRequest.hasConnectionInfo());
+ }
+
+ @Test(timeout = 20000)
+ public void testResourceReport() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(2);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ assertEquals(0, statusUpdater.getQueueSize());
+ for (int i = 0; i < statusUpdater.getQueueingThreshold(); i++) {
+ dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
+ }
+ barrier.await();
+ assertEquals(0, statusUpdater.getQueueSize());
+ }
+
+ @Test(timeout = 20000)
+ public void testFlushResourceReport() throws Exception {
+ CountDownLatch barrier = new CountDownLatch(2);
+ statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
+ statusUpdater.init(conf);
+ statusUpdater.start();
+
+ assertEquals(0, statusUpdater.getQueueSize());
+ dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+
+ barrier.await();
+ assertEquals(0, statusUpdater.getQueueSize());
+ }
+}