You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/01 05:45:06 UTC

[1/2] TAJO-589: Add fine grained progress indicator for each task. (hyoungjunkim via hyunsik)

Repository: incubator-tajo
Updated Branches:
  refs/heads/master f5945424e -> c573b6fcf


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
index 8317ee6..3e8dfef 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/queryunit.jsp
@@ -38,6 +38,8 @@
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
+<%@ page import="org.apache.tajo.worker.TaskHistory" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -121,16 +123,18 @@
         delim = "<br/>";
     }
 
-    int numPartitions = queryUnit.getShuffleOutpuNum();
-    String partitionKey = "-";
-    String partitionFileName = "-";
-    if(numPartitions > 0) {
+    int numShuffles = queryUnit.getShuffleOutpuNum();
+    String shuffleKey = "-";
+    String shuffleFileName = "-";
+    if(numShuffles > 0) {
         TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = queryUnit.getShuffleFileOutputs().get(0);
-        partitionKey = "" + shuffleFileOutputs.getPartId();
-        partitionFileName = shuffleFileOutputs.getFileName();
+        shuffleKey = "" + shuffleFileOutputs.getPartId();
+        shuffleFileName = shuffleFileOutputs.getFileName();
     }
 
     //int numIntermediateData = queryUnit.getIntermediateData() == null ? 0 : queryUnit.getIntermediateData().size();
+    TableStats inputStat = queryUnit.getLastAttempt().getInputStats();
+    TableStats outputStat = queryUnit.getLastAttempt().getResultStats();
 %>
 
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
@@ -149,14 +153,17 @@
     <hr/>
     <table border="1" width="100%" class="border_table">
         <tr><td width="200" align="right">ID</td><td><%=queryUnit.getId()%></td></tr>
+        <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(queryUnit.getLastAttempt().getProgress())%>%</td></tr>
         <tr><td align="right">State</td><td><%=queryUnit.getState()%></td></tr>
         <tr><td align="right">Launch Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : df.format(queryUnit.getLaunchTime())%></td></tr>
         <tr><td align="right">Finish Time</td><td><%=queryUnit.getFinishTime() == 0 ? "-" : df.format(queryUnit.getFinishTime())%></td></tr>
         <tr><td align="right">Running Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : queryUnit.getRunningTime() + " ms"%></td></tr>
         <tr><td align="right">Host</td><td><%=queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost()%></td></tr>
-        <tr><td align="right">Partitions</td><td># Partitions: <%=numPartitions%>, Partition Key: <%=partitionKey%>, Partition file: <%=partitionFileName%></td></tr>
+        <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr>
         <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr>
         <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr>
+        <tr><td align="right">Input Statistics</td><td><%=TaskHistory.toInputStatsString(inputStat)%></td></tr>
+        <tr><td align="right">Output Statistics</td><td><%=TaskHistory.toOutputStatsString(outputStat)%></td></tr>
         <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr>
     </table>
 </div>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
index 4a64d0b..b264081 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/taskdetail.jsp
@@ -86,9 +86,11 @@
         <tr><td align="right">Start Time</td><td><%=taskHistory.getStartTime() == 0 ? "-" : df.format(taskHistory.getStartTime())%></td></tr>
         <tr><td align="right">Finish Time</td><td><%=taskHistory.getFinishTime() == 0 ? "-" : df.format(taskHistory.getFinishTime())%></td></tr>
         <tr><td align="right">Running Time</td><td><%=JSPUtil.getElapsedTime(taskHistory.getStartTime(), taskHistory.getFinishTime())%></td></tr>
-        <tr><td align="right">Progress</td><td><%=taskHistory.getProgress() * 100.0%> %</td></tr>
+        <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(taskHistory.getProgress())%>%</td></tr>
         <tr><td align="right">Output Path</td><td><%=taskHistory.getOutputPath()%></td></tr>
         <tr><td align="right">Working Path</td><td><%=taskHistory.getWorkingPath()%></td></tr>
+        <tr><td align="right">Input Statistics</td><td><%=TaskHistory.toInputStatsString(taskHistory.getInputStats())%></td></tr>
+        <tr><td align="right">Output Statistics</td><td><%=TaskHistory.toOutputStatsString(taskHistory.getOutputStats())%></td></tr>
     </table>
 
 <%

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index e1a231a..ddb9952 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -254,6 +254,9 @@ public class QueryTestCaseBase {
    * @param resultSet ResultSet
    */
   public final void cleanupQuery(ResultSet resultSet) throws IOException {
+    if (resultSet == null) {
+      return;
+    }
     try {
       resultSet.close();
     } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index aa72b06..323a7cf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -336,6 +336,10 @@ public class TajoTestingCluster {
     return this.tajoMaster;
   }
 
+  public List<TajoWorker> getTajoWorkers() {
+    return this.tajoWorkers;
+  }
+
   public void shutdownMiniTajoCluster() {
     if(this.tajoMaster != null) {
       this.tajoMaster.stop();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
new file mode 100644
index 0000000..cb9c419
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestProgressExternalSortExec {
+  private TajoConf conf;
+  private TajoTestingCluster util;
+  private final String TEST_PATH = "target/test-data/TestProgressExternalSortExec";
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private AbstractStorageManager sm;
+  private Path testDir;
+
+  private final int numTuple = 100000;
+  private Random rnd = new Random(System.currentTimeMillis());
+
+  private TableDesc employee;
+
+  private TableStats testDataStats;
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new TajoConf();
+    util = new TajoTestingCluster();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+    Schema schema = new Schema();
+    schema.addColumn("managerId", TajoDataTypes.Type.INT4);
+    schema.addColumn("empId", TajoDataTypes.Type.INT4);
+    schema.addColumn("deptName", TajoDataTypes.Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    appender.enableStats();
+    appender.init();
+    Tuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < numTuple; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(rnd.nextInt(50)),
+          DatumFactory.createInt4(rnd.nextInt(100)),
+          DatumFactory.createText("dept_" + i),
+      });
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    System.out.println(appender.getStats().getNumRows() + " rows (" + appender.getStats().getNumBytes() + " Bytes)");
+
+    testDataStats = appender.getStats();
+    employee = new TableDesc("employee", schema, employeeMeta, employeePath);
+    catalog.addTable(employee);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CommonTestingUtil.cleanupTestDir(TEST_PATH);
+    util.shutdownCatalogCluster();
+  }
+
+  String[] QUERIES = {
+      "select managerId, empId from employee order by managerId, empId"
+  };
+
+  @Test
+  public void testExternalSortExecProgressWithMemTableScanner() throws Exception {
+    testProgress(testDataStats.getNumBytes().intValue() * 20);    //multiply 20 for memory fit
+  }
+
+  @Test
+  public void testExternalSortExecProgressWithPairWiseMerger() throws Exception {
+    testProgress(testDataStats.getNumBytes().intValue());
+  }
+
+  private void testProgress(int sortBufferBytesNum) throws Exception {
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(expr);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    ProjectionExec proj = (ProjectionExec) exec;
+
+    // TODO - should be planed with user's optimization hint
+    if (!(proj.getChild() instanceof ExternalSortExec)) {
+      UnaryPhysicalExec sortExec = proj.getChild();
+      SeqScanExec scan = sortExec.getChild();
+
+      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
+          ((MemSortExec)sortExec).getPlan(), scan);
+
+      extSort.setSortBufferBytesNum(sortBufferBytesNum);
+      proj.setChild(extSort);
+    } else {
+      ((ExternalSortExec)proj.getChild()).setSortBufferBytesNum(sortBufferBytesNum);
+    }
+
+    Tuple tuple;
+    Tuple preVal = null;
+    Tuple curVal;
+    int cnt = 0;
+    exec.init();
+    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+        new SortSpec[]{
+            new SortSpec(new Column("managerId", TajoDataTypes.Type.INT4)),
+            new SortSpec(new Column("empId", TajoDataTypes.Type.INT4))
+        });
+
+    float initProgress = 0.0f;
+    while ((tuple = exec.next()) != null) {
+      if (cnt == 0) {
+        initProgress = exec.getProgress();
+        assertTrue(initProgress > 0.5f && initProgress < 1.0f);
+      }
+
+      if (cnt == testDataStats.getNumRows() / 2) {
+        float progress = exec.getProgress();
+
+        assertTrue(progress > initProgress);
+      }
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+
+    assertEquals(1.0f, exec.getProgress(), 0);
+    assertEquals(numTuple, cnt);
+
+    TableStats tableStats = exec.getInputStats();
+    assertNotNull(tableStats);
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue());
+    assertEquals(cnt, testDataStats.getNumRows().longValue());
+    assertEquals(cnt, tableStats.getNumRows().longValue());
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue());
+
+    // for rescan test
+    preVal = null;
+    exec.rescan();
+
+    cnt = 0;
+    while ((tuple = exec.next()) != null) {
+      curVal = tuple;
+      if (preVal != null) {
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
+      }
+      preVal = curVal;
+      cnt++;
+    }
+    assertEquals(1.0f, exec.getProgress(), 0);
+    assertEquals(numTuple, cnt);
+    exec.close();
+    assertEquals(1.0f, exec.getProgress(), 0);
+
+    tableStats = exec.getInputStats();
+    assertNotNull(tableStats);
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue());
+    assertEquals(cnt, testDataStats.getNumRows().longValue());
+    assertEquals(cnt, tableStats.getNumRows().longValue());
+    assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getReadBytes().longValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
new file mode 100644
index 0000000..5d3f7bf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.worker.TajoWorker;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(IntegrationTest.class)
+public class TestQueryUnitStatusUpdate extends QueryTestCaseBase {
+  @Test
+  public final void case1() throws Exception {
+    // select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
+    ResultSet res = null;
+    try {
+      res = executeQuery();
+
+      // tpch/lineitem.tbl
+      long[] expectedNumRows = new long[]{5, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{604, 18, 18, 8};
+      long[] expectedReadBytes = new long[]{604, 0, 18, 0};
+
+      assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+  @Test
+  public final void case2() throws Exception {
+    // ExternalMergeSort
+    ResultSet res = null;
+    try {
+      res = executeQuery();
+
+      // tpch/lineitem.tbl
+      long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194};
+      long[] expectedReadBytes = new long[]{604, 0, 162, 0, 138, 0};
+
+      assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+
+  @Test
+  public final void case3() throws Exception {
+    // Partition Scan
+    ResultSet res = null;
+    try {
+      createColumnPartitionedTable();
+
+      res = executeQuery();
+
+      long[] expectedNumRows = new long[]{7, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{63, 34, 34, 18};
+      long[] expectedReadBytes = new long[]{63, 0, 34, 0};
+
+      assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
+    } finally {
+      cleanupQuery(res);
+    }
+  }
+
+  private void createColumnPartitionedTable() throws Exception {
+    String tableName ="ColumnPartitionedTable";
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(tableName));
+    assertEquals(2, catalog.getTableDesc(tableName).getSchema().size());
+    assertEquals(3, catalog.getTableDesc(tableName).getLogicalSchema().size());
+
+    res = testBase.execute(
+        "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+  }
+
+  private void assertStatus(int numSubQueries,
+                            long[] expectedNumRows,
+                            long[] expectedNumBytes,
+                            long[] expectedReadBytes) throws Exception {
+      List<TajoWorker> tajoWorkers = testingCluster.getTajoWorkers();
+      Collection<QueryMasterTask> finishedTasks = null;
+      for (TajoWorker eachWorker: tajoWorkers) {
+        finishedTasks = eachWorker.getWorkerContext().getQueryMaster().getFinishedQueryMasterTasks();
+        if (finishedTasks != null && !finishedTasks.isEmpty()) {
+          break;
+        }
+      }
+
+      assertNotNull(finishedTasks);
+      assertTrue(!finishedTasks.isEmpty());
+
+      List<QueryMasterTask> finishedTaskList = new ArrayList<QueryMasterTask>(finishedTasks);
+
+      Collections.sort(finishedTaskList, new Comparator<QueryMasterTask>() {
+        @Override
+        public int compare(QueryMasterTask o1, QueryMasterTask o2) {
+          return o2.getQueryId().compareTo(o1.getQueryId());
+        }
+      });
+
+      Query query = finishedTaskList.get(0).getQuery();
+
+      assertNotNull(query);
+
+      List<SubQuery> subQueries = new ArrayList<SubQuery>(query.getSubQueries());
+      assertEquals(numSubQueries, subQueries.size());
+
+      Collections.sort(subQueries, new Comparator<SubQuery>() {
+        @Override
+        public int compare(SubQuery o1, SubQuery o2) {
+          return o1.getId().compareTo(o2.getId());
+        }
+      });
+
+      int index = 0;
+      for (SubQuery eachSubQuery: subQueries) {
+        TableStats inputStats = eachSubQuery.getInputStats();
+        TableStats resultStats = eachSubQuery.getResultStats();
+
+        assertNotNull(inputStats);
+        assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue());
+        assertEquals(expectedNumBytes[index], inputStats.getNumBytes().longValue());
+        assertEquals(expectedReadBytes[index], inputStats.getReadBytes().longValue());
+
+        index++;
+
+        assertNotNull(resultStats);
+        assertEquals(expectedNumRows[index], resultStats.getNumRows().longValue());
+        assertEquals(expectedNumBytes[index], resultStats.getNumBytes().longValue());
+        assertEquals(expectedReadBytes[index], resultStats.getReadBytes().longValue());
+
+        index++;
+      }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/TestJSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/TestJSPUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/TestJSPUtil.java
new file mode 100644
index 0000000..74c2856
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/TestJSPUtil.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestJSPUtil {
+  @Test
+  public void testSortQueryUnit() throws Exception {
+    List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+    Configuration conf = new TajoConf();
+
+    QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext scheduleContext =
+        new QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext();
+
+    ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId("eb_000001_00001_00001");
+
+    for (int i = 0; i < 10; i++) {
+      QueryUnitId id = new QueryUnitId(ebId, i);
+      QueryUnit queryUnit = new QueryUnit(conf, scheduleContext, id, true, null);
+      queryUnits.add(queryUnit);
+
+      int launchTime = i + 1;
+      int runningTime = i + 1;
+      if(i < 9) {
+        queryUnit.setLaunchTime(launchTime);
+        queryUnit.setFinishTime(launchTime + runningTime);
+      }
+    }
+
+    Collections.shuffle(queryUnits);
+
+    QueryUnit[] queryUnitArray = queryUnits.toArray(new QueryUnit[]{});
+    JSPUtil.sortQueryUnit(queryUnitArray, "id", "asc");
+    for (int i = 0; i < 10; i++) {
+      assertEquals(i, queryUnitArray[i].getId().getId());
+    }
+
+    queryUnitArray = queryUnits.toArray(new QueryUnit[]{});
+    JSPUtil.sortQueryUnit(queryUnitArray, "id", "desc");
+    for (int i = 0; i < 10; i++) {
+      assertEquals(9 - i, queryUnitArray[i].getId().getId());
+    }
+
+    queryUnitArray = queryUnits.toArray(new QueryUnit[]{});
+    JSPUtil.sortQueryUnit(queryUnitArray, "runTime", "asc");
+    assertEquals(0, queryUnitArray[0].getId().getId());
+    assertEquals(9, queryUnitArray[9].getId().getId());
+
+    queryUnitArray = queryUnits.toArray(new QueryUnit[]{});
+    JSPUtil.sortQueryUnit(queryUnitArray, "runTime", "desc");
+    assertEquals(8, queryUnitArray[0].getId().getId());
+    assertEquals(9, queryUnitArray[9].getId().getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
new file mode 100644
index 0000000..f6d0eb3
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql
@@ -0,0 +1 @@
+select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
new file mode 100644
index 0000000..c3e09f1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql
@@ -0,0 +1,5 @@
+select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum
+(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as
+ count_order
+from lineitem
+group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
new file mode 100644
index 0000000..9c9362e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql
@@ -0,0 +1,11 @@
+select *
+from (
+  select a.col1, a.col2, a.key
+  from ColumnPartitionedTable a
+  join ColumnPartitionedTable b on a.key = b.key
+  where
+    (a.key = 45.0 or a.key = 38.0)
+) test
+order by
+  col1, col2
+;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index ed65af6..6c1923c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -381,7 +381,9 @@ public class CSVFile {
       rowLengthList.clear();
       fileOffsets.clear();
 
-      if(eof) return;
+      if(eof) {
+        return;
+      }
 
       while (DEFAULT_PAGE_SIZE >= bufferedSize){
 
@@ -404,6 +406,34 @@ public class CSVFile {
           break;
         }
       }
+      if (tableStats != null) {
+        tableStats.setReadBytes(getFilePosition() - startOffset);
+        tableStats.setNumRows(recordCount);
+      }
+    }
+
+    @Override
+    public float getProgress() {
+      try {
+        if(eof) {
+          return 1.0f;
+        }
+        long filePos = getFilePosition();
+
+        if (tableStats != null) {
+          tableStats.setReadBytes(filePos - startOffset);
+          tableStats.setNumRows(recordCount);
+        }
+
+        if (startOffset == filePos) {
+          return 0.0f;
+        } else {
+          return Math.min(1.0f, (float)(filePos - startOffset) / (float)(end - startOffset));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return 0.0f;
+      }
     }
 
     @Override
@@ -454,6 +484,11 @@ public class CSVFile {
     @Override
     public void close() throws IOException {
       try {
+        if (tableStats != null) {
+          tableStats.setReadBytes(fragment.getEndKey());
+          tableStats.setNumRows(recordCount);
+        }
+
         IOUtils.cleanup(LOG, reader, is, fis);
         fs = null;
         is = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index 553fec9..f15c4c9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.fragment.FileFragment;
 
@@ -42,17 +44,35 @@ public abstract class FileScanner implements Scanner {
   protected final int columnNum;
 
   protected Column [] targets;
-  
+
+  protected float progress;
+
+  protected TableStats tableStats;
+
   public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
     this.conf = conf;
     this.meta = meta;
     this.schema = schema;
     this.fragment = fragment;
+    this.tableStats = new TableStats();
     this.columnNum = this.schema.size();
   }
 
   public void init() throws IOException {
     inited = true;
+    progress = 0.0f;
+
+    if (fragment != null) {
+      tableStats.setNumBytes(fragment.getEndKey());
+      tableStats.setNumBlocks(1);
+    }
+
+    if (schema != null) {
+      for(Column eachColumn: schema.getColumns()) {
+        ColumnStats columnStats = new ColumnStats(eachColumn);
+        tableStats.addColumnStat(columnStats);
+      }
+    }
   }
 
   @Override
@@ -90,4 +110,14 @@ public abstract class FileScanner implements Scanner {
 
     return fs;
   }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return tableStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 1d4963d..0235ce9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.fragment.FileFragment;
 
@@ -42,6 +44,8 @@ public class MergeScanner implements Scanner {
   private boolean projectable = false;
   private boolean selectable = false;
   private Schema target;
+  private float progress;
+  protected TableStats tableStats;
 
   public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList)
       throws IOException {
@@ -64,10 +68,25 @@ public class MergeScanner implements Scanner {
       this.projectable = currentScanner.isProjectable();
       this.selectable = currentScanner.isSelectable();
     }
+
+    tableStats = new TableStats();
+    long numBytes = 0;
+
+    for (FileFragment eachFileFragment: rawFragmentList) {
+      numBytes += (eachFileFragment.getEndKey() - eachFileFragment.getStartKey());
+    }
+    tableStats.setNumBytes(numBytes);
+    tableStats.setNumBlocks(rawFragmentList.size());
+
+    for(Column eachColumn: schema.getColumns()) {
+      ColumnStats columnStats = new ColumnStats(eachColumn);
+      tableStats.addColumnStat(columnStats);
+    }
   }
 
   @Override
   public void init() throws IOException {
+    progress = 0.0f;
   }
 
   @Override
@@ -80,6 +99,11 @@ public class MergeScanner implements Scanner {
     } else {
       if (currentScanner != null) {
         currentScanner.close();
+        TableStats scannerTableStsts = currentScanner.getInputStats();
+        if (scannerTableStsts != null) {
+          tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes());
+          tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows());
+        }
       }
       currentScanner = getNextScanner();
       if (currentScanner != null) {
@@ -113,6 +137,7 @@ public class MergeScanner implements Scanner {
       currentScanner.close();
     }
     iterator = null;
+    progress = 1.0f;
   }
 
   @Override
@@ -143,4 +168,24 @@ public class MergeScanner implements Scanner {
   public boolean isSplittable(){
     return false;
   }
+
+  @Override
+  public float getProgress() {
+    if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) {
+      TableStats scannerTableStsts = currentScanner.getInputStats();
+      long currentScannerReadBytes = 0;
+      if (scannerTableStsts != null) {
+        currentScannerReadBytes = scannerTableStsts.getReadBytes();
+      }
+
+      return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes();
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return tableStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 5c29928..22757b5 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -57,6 +57,7 @@ public class RawFile {
     private boolean eof = false;
     private long fileSize;
     private FileInputStream fis;
+    private long recordCount;
 
     public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
       super(conf, schema, meta, null);
@@ -85,6 +86,9 @@ public class RawFile {
       channel = fis.getChannel();
       fileSize = channel.size();
 
+      if (tableStats != null) {
+        tableStats.setNumBytes(fileSize);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
       }
@@ -246,6 +250,8 @@ public class RawFile {
         }
       }
 
+      recordCount++;
+
       for (int i = 0; i < columnTypes.length; i++) {
         // check if the i'th column is null
         if (nullFlags.get(i)) {
@@ -369,6 +375,10 @@ public class RawFile {
 
     @Override
     public void close() throws IOException {
+      if (tableStats != null) {
+        tableStats.setReadBytes(fileSize);
+        tableStats.setNumRows(recordCount);
+      }
       buffer.clear();
       channel.close();
       fis.close();
@@ -388,6 +398,32 @@ public class RawFile {
     public boolean isSplittable(){
       return false;
     }
+
+    @Override
+    public float getProgress() {
+      try {
+        tableStats.setNumRows(recordCount);
+        long filePos = 0;
+        if (channel != null) {
+          filePos = channel.position();
+          tableStats.setReadBytes(filePos);
+        }
+
+        if(eof || channel == null) {
+          tableStats.setReadBytes(fileSize);
+          return 1.0f;
+        }
+
+        if (filePos == 0) {
+          return 0.0f;
+        } else {
+          return Math.min(1.0f, ((float)filePos / (float)fileSize));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return 0.0f;
+      }
+    }
   }
 
   public static class RawFileAppender extends FileAppender {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
index 6dca3f2..16c4faa 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -91,4 +92,12 @@ public interface Scanner extends SchemaObject, Closeable {
    * @return true if this scanner can split the a file.
    */
   boolean isSplittable();
+
+  /**
+   * How much of the input has the Scanner consumed
+   * @return progress from <code>0.0</code> to <code>1.0</code>.
+   */
+  float getProgress();
+
+  TableStats getInputStats();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
index f9ad2f7..da7084c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -53,6 +55,10 @@ public abstract class FileScannerV2 implements Scanner {
   protected StorageManagerV2.StorgaeManagerContext smContext;
 
   protected AtomicBoolean firstSchdeuled = new AtomicBoolean(true);
+  
+  protected float progress;
+  
+  protected TableStats tableStats;
 
   protected abstract boolean scanNext(int length) throws IOException;
 
@@ -91,6 +97,20 @@ public abstract class FileScannerV2 implements Scanner {
       smContext.requestFileScan(this);
     }
     inited = true;
+    progress = 0.0f;
+
+    tableStats = new TableStats();
+    if (fragment != null) {
+      tableStats.setNumBytes(fragment.getEndKey());
+      tableStats.setNumBlocks(1);
+    }
+
+    if (schema != null) {
+      for(Column eachColumn: schema.getColumns()) {
+        ColumnStats columnStats = new ColumnStats(eachColumn);
+        tableStats.addColumnStat(columnStats);
+      }
+    }
   }
 
   @Override
@@ -182,6 +202,7 @@ public abstract class FileScannerV2 implements Scanner {
     long[] readBytes = reportReadBytes();
     smContext.incrementReadBytes(allocatedDiskId, readBytes);
     closed.set(true);
+    progress = 1.0f;
     LOG.info(toString() + " closed, totalScanTime=" + totalScanTime);
   }
 
@@ -200,4 +221,14 @@ public abstract class FileScannerV2 implements Scanner {
     }
     return nextTuple();
   }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return tableStats;
+  }
 }


[2/2] git commit: TAJO-589: Add fine grained progress indicator for each task. (hyoungjunkim via hyunsik)

Posted by hy...@apache.org.
TAJO-589: Add fine grained progress indicator for each task. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c573b6fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c573b6fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c573b6fc

Branch: refs/heads/master
Commit: c573b6fcf7ad588c4fd7c2baf5b9047c97117038
Parents: f594542
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Mar 1 13:44:43 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Mar 1 13:44:43 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../tajo/catalog/statistics/StatisticsUtil.java |   2 +
 .../tajo/catalog/statistics/TableStats.java     |  56 ++++-
 .../src/main/proto/CatalogProtos.proto          |   3 +-
 .../main/java/org/apache/tajo/cli/TajoCli.java  |   1 +
 .../planner/physical/BSTIndexScanExec.java      |  11 +-
 .../planner/physical/BinaryPhysicalExec.java    |  47 +++-
 .../engine/planner/physical/EvalExprExec.java   |   8 +
 .../planner/physical/ExternalSortExec.java      | 155 ++++++++++++-
 .../physical/HashShuffleFileWriteExec.java      |   2 +
 .../physical/PartitionMergeScanExec.java        |  36 +++
 .../engine/planner/physical/PhysicalExec.java   |   7 +
 .../engine/planner/physical/SeqScanExec.java    |  31 +++
 .../planner/physical/UnaryPhysicalExec.java     |  32 +++
 .../tajo/master/AbstractTaskScheduler.java      |  16 ++
 .../tajo/master/DefaultTaskScheduler.java       |   3 -
 .../apache/tajo/master/TajoContainerProxy.java  |  10 +-
 .../tajo/master/TajoMasterClientService.java    |   1 -
 .../apache/tajo/master/querymaster/Query.java   |   6 +-
 .../tajo/master/querymaster/QueryUnit.java      |  11 +
 .../master/querymaster/QueryUnitAttempt.java    |  37 ++-
 .../tajo/master/querymaster/Repartitioner.java  |   4 +-
 .../tajo/master/querymaster/SubQuery.java       | 114 +++++++---
 .../main/java/org/apache/tajo/util/JSPUtil.java |   2 +
 .../tajo/worker/TajoResourceAllocator.java      |   9 +-
 .../main/java/org/apache/tajo/worker/Task.java  | 114 +++++++---
 .../apache/tajo/worker/TaskAttemptContext.java  |  24 +-
 .../org/apache/tajo/worker/TaskHistory.java     |  45 ++++
 .../java/org/apache/tajo/worker/TaskRunner.java |   6 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |  10 +-
 .../resources/webapps/worker/querytasks.jsp     |  66 +++++-
 .../main/resources/webapps/worker/queryunit.jsp |  21 +-
 .../resources/webapps/worker/taskdetail.jsp     |   4 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |   3 +
 .../org/apache/tajo/TajoTestingCluster.java     |   4 +
 .../physical/TestProgressExternalSortExec.java  | 226 +++++++++++++++++++
 .../querymaster/TestQueryUnitStatusUpdate.java  | 168 ++++++++++++++
 .../java/org/apache/tajo/util/TestJSPUtil.java  |  84 +++++++
 .../queries/TestQueryUnitStatusUpdate/case1.sql |   1 +
 .../queries/TestQueryUnitStatusUpdate/case2.sql |   5 +
 .../queries/TestQueryUnitStatusUpdate/case3.sql |  11 +
 .../java/org/apache/tajo/storage/CSVFile.java   |  37 ++-
 .../org/apache/tajo/storage/FileScanner.java    |  32 ++-
 .../org/apache/tajo/storage/MergeScanner.java   |  45 ++++
 .../java/org/apache/tajo/storage/RawFile.java   |  36 +++
 .../java/org/apache/tajo/storage/Scanner.java   |   9 +
 .../apache/tajo/storage/v2/FileScannerV2.java   |  31 +++
 47 files changed, 1469 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 75a1b74..bbf66a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,9 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-589: Add fine grained progress indicator for each task.
+    (hyoungjunkim via hyunsik)
+
     TAJO-614: Explaning a logical node should use ExplainLogicalPlanVisitor.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
index 8593db6..01316bc 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
@@ -81,6 +81,7 @@ public class StatisticsUtil {
 
     result.setNumRows(result.getNumRows() + stats.getNumRows());
     result.setNumBytes(result.getNumBytes() + stats.getNumBytes());
+    result.setReadBytes(result.getReadBytes() + stats.getReadBytes());
     result.setNumBlocks(result.getNumBlocks() + stats.getNumBlocks());
     result.setNumShuffleOutputs(result.getNumShuffleOutputs() + stats.getNumShuffleOutputs());
   }
@@ -132,6 +133,7 @@ public class StatisticsUtil {
       // aggregate table stats for each table
       aggregated.setNumRows(aggregated.getNumRows() + ts.getNumRows());
       aggregated.setNumBytes(aggregated.getNumBytes() + ts.getNumBytes());
+      aggregated.setReadBytes(aggregated.getReadBytes() + ts.getReadBytes());
       aggregated.setNumBlocks(aggregated.getNumBlocks() + ts.getNumBlocks());
       aggregated.setNumShuffleOutputs(aggregated.getNumShuffleOutputs() + ts.getNumShuffleOutputs());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
index 9a72da6..de2922e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
@@ -43,6 +43,7 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
   @Expose private Integer numBlocks = null; // optional
   @Expose private Integer numShuffleOutputs = null; // optional
   @Expose private Long avgRows = null; // optional
+  @Expose private Long readBytes = null; //optional
   @Expose private List<ColumnStats> columnStatses = null; // repeated
 
   public TableStats() {
@@ -51,6 +52,7 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
     numBlocks = 0;
     numShuffleOutputs = 0;
     avgRows = 0l;
+    readBytes = 0l;
     columnStatses = TUtil.newList();
   }
 
@@ -73,6 +75,11 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
     } else {
       this.avgRows = 0l;
     }
+    if (proto.hasReadBytes()) {
+      this.readBytes = proto.getReadBytes();
+    } else {
+      this.readBytes = 0l;
+    }
 
     this.columnStatses = TUtil.newList();
     for (CatalogProtos.ColumnStatsProto colProto : proto.getColStatList()) {
@@ -123,6 +130,14 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
     this.avgRows = avgRows;
   }
 
+  public Long getReadBytes() {
+    return readBytes;
+  }
+
+  public void setReadBytes(long readBytes) {
+    this.readBytes = readBytes;
+  }
+
   public List<ColumnStats> getColumnStats() {
     return this.columnStatses;
   }
@@ -144,6 +159,7 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
       eq = eq && TUtil.checkEquals(this.numBlocks, other.numBlocks);
       eq = eq && TUtil.checkEquals(this.numShuffleOutputs, other.numShuffleOutputs);
       eq = eq && TUtil.checkEquals(this.avgRows, other.avgRows);
+      eq = eq && TUtil.checkEquals(this.readBytes, other.readBytes);
       eq = eq && TUtil.checkEquals(this.columnStatses, other.columnStatses);
       return eq;
     } else {
@@ -159,15 +175,44 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
   public Object clone() throws CloneNotSupportedException {
     TableStats stat = (TableStats) super.clone();
     stat.builder = CatalogProtos.TableStatsProto.newBuilder();
-    stat.numRows = numRows != null ? numRows.longValue() : null;
-    stat.numBytes = numBytes != null ? numBytes.longValue() : null;
-    stat.numBlocks = numBlocks != null ? numBlocks.intValue() : null;
-    stat.numShuffleOutputs = numShuffleOutputs != null ? numShuffleOutputs.intValue() : null;
+    stat.numRows = numRows != null ? numRows : null;
+    stat.numBytes = numBytes != null ? numBytes : null;
+    stat.numBlocks = numBlocks != null ? numBlocks : null;
+    stat.numShuffleOutputs = numShuffleOutputs != null ? numShuffleOutputs : null;
+    stat.avgRows = avgRows != null ? avgRows : null;
+    stat.readBytes = readBytes != null ? readBytes : null;
+
     stat.columnStatses = new ArrayList<ColumnStats>(this.columnStatses);
 
     return stat;
   }
 
+  public void merge(TableStats stat) {
+    if(stat == null) {
+      return;
+    }
+
+    numRows = stat.numRows != null ? stat.numRows + numRows : numRows;
+    numBytes = stat.numBytes != null ? stat.numBytes + numBytes : numBytes;
+    numBlocks = stat.numBlocks != null ? stat.numBlocks + numBlocks : numBlocks;
+    numShuffleOutputs = stat.numShuffleOutputs != null ? stat.numShuffleOutputs + numShuffleOutputs : numShuffleOutputs;
+    avgRows = stat.avgRows != null ? stat.avgRows + avgRows : avgRows;
+    readBytes = stat.readBytes != null ? stat.readBytes + readBytes : readBytes;
+  }
+
+  public void setValues(TableStats stat) {
+    if(stat == null) {
+      return;
+    }
+
+    numRows = stat.numRows != null ? stat.numRows : 0;
+    numBytes = stat.numBytes != null ? stat.numBytes : 0;
+    numBlocks = stat.numBlocks != null ? stat.numBlocks : 0;
+    numShuffleOutputs = stat.numShuffleOutputs != null ? stat.numShuffleOutputs : 0;
+    avgRows = stat.avgRows != null ? stat.avgRows : 0;
+    readBytes = stat.readBytes != null ? stat.readBytes : 0;
+  }
+
   public String toString() {
     Gson gson = CatalogGsonHelper.getPrettyInstance();
     return gson.toJson(this);
@@ -198,6 +243,9 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
     if (this.avgRows != null) {
       builder.setAvgRows(this.avgRows);
     }
+    if (this.readBytes != null) {
+      builder.setReadBytes(this.readBytes);
+    }
     if (this.columnStatses != null) {
       for (ColumnStats colStat : columnStatses) {
         builder.addColStat(colStat.getProto());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index f5fff2c..35171cc 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -187,7 +187,8 @@ message TableStatsProto {
 	optional int32 numBlocks = 4;
 	optional int32 numShuffleOutputs = 5;
 	optional int64 avgRows = 6;
-	repeated ColumnStatsProto colStat = 7;
+	optional int64 readBytes = 7;
+	repeated ColumnStatsProto colStat = 8;
 }
 
 message ColumnStatsProto {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index f107c51..3026d9c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -430,6 +430,7 @@ public class TajoCli {
                   sout.print("continue... ('q' is quit)");
                   sout.flush();
                   if (sin.read() == 'q') {
+                    sout.println();
                     break;
                   }
                   numOfPrintedRows = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index d2f0922..35de707 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -44,7 +44,9 @@ public class BSTIndexScanExec extends PhysicalExec {
   private Datum[] datum = null;
   
   private boolean initialize = true;
-  
+
+  private float progress;
+
   public BSTIndexScanExec(TaskAttemptContext context,
                           AbstractStorageManager sm , ScanNode scanNode ,
        FileFragment fragment, Path fileName , Schema keySchema,
@@ -66,7 +68,7 @@ public class BSTIndexScanExec extends PhysicalExec {
 
   @Override
   public void init() throws IOException {
-
+    progress = 0.0f;
   }
 
   @Override
@@ -133,4 +135,9 @@ public class BSTIndexScanExec extends PhysicalExec {
     qual = null;
     projector = null;
   }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index fc8d25d..35c8f6f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -18,14 +18,17 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
 
 import java.io.IOException;
 
 public abstract class BinaryPhysicalExec extends PhysicalExec {
   protected PhysicalExec leftChild;
   protected PhysicalExec rightChild;
+  protected float progress;
+  protected TableStats inputStats;
 
   public BinaryPhysicalExec(final TaskAttemptContext context,
                             final Schema inSchema, final Schema outSchema,
@@ -47,6 +50,9 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
   public void init() throws IOException {
     leftChild.init();
     rightChild.init();
+    progress = 0.0f;
+
+    inputStats = new TableStats();
   }
 
   @Override
@@ -59,7 +65,46 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
   public void close() throws IOException {
     leftChild.close();
     rightChild.close();
+
+    getInputStats();
+    
     leftChild = null;
     rightChild = null;
+    
+    progress = 1.0f;
+  }
+
+  @Override
+  public float getProgress() {
+    if (leftChild == null) {
+      return progress;
+    }
+    return leftChild.getProgress() * 0.5f + rightChild.getProgress() * 0.5f;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (leftChild == null) {
+      return inputStats;
+    }
+    TableStats leftInputStats = leftChild.getInputStats();
+    inputStats.setNumBytes(0);
+    inputStats.setReadBytes(0);
+    inputStats.setNumRows(0);
+
+    if (leftInputStats != null) {
+      inputStats.setNumBytes(leftInputStats.getNumBytes());
+      inputStats.setReadBytes(leftInputStats.getReadBytes());
+      inputStats.setNumRows(leftInputStats.getNumRows());
+    }
+
+    TableStats rightInputStats = rightChild.getInputStats();
+    if (rightInputStats != null) {
+      inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+      inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+      inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+    }
+
+    return inputStats;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 83580f9..a843bce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 
 public class EvalExprExec extends PhysicalExec {
   private final EvalExprNode plan;
+  private float progress;
 
   public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -36,6 +37,7 @@ public class EvalExprExec extends PhysicalExec {
 
   @Override
   public void init() throws IOException {
+    progress = 0.0f;
   }
 
   @Override
@@ -54,5 +56,11 @@ public class EvalExprExec extends PhysicalExec {
 
   @Override
   public void close() throws IOException {
+    progress = 1.0f;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 2dfbef4..4ceb3fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -30,6 +31,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
@@ -67,7 +69,7 @@ public class ExternalSortExec extends SortExec {
   /** the defaultFanout of external sort */
   private final int defaultFanout;
   /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
-  private final int sortBufferBytesNum;
+  private int sortBufferBytesNum;
   /** the number of available cores */
   private final int allocatedCoreNum;
   /** If there are available multiple cores, it tries parallel merge. */
@@ -94,6 +96,8 @@ public class ExternalSortExec extends SortExec {
   private boolean memoryResident = true;
   /** the final result */
   private Scanner result;
+  /** total bytes of input data */
+  private long sortAndStoredBytes;
 
   private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
       throws PhysicalPlanningException {
@@ -136,7 +140,13 @@ public class ExternalSortExec extends SortExec {
     setChild(child);
   }
 
+  @VisibleForTesting
+  public void setSortBufferBytesNum(int sortBufferBytesNum) {
+    this.sortBufferBytesNum = sortBufferBytesNum;
+  }
+
   public void init() throws IOException {
+    inputStats = new TableStats();
     super.init();
   }
 
@@ -205,6 +215,19 @@ public class ExternalSortExec extends SortExec {
 
         memoryConsumption = 0;
         chunkId++;
+
+        // When the volume of sorting data once exceed the size of sort buffer,
+        // the total progress of this external sort is divided into two parts.
+        // In contrast, if the data fits in memory, the progress is only one part.
+        //
+        // When the progress is divided into two parts, the first part sorts tuples on memory and stores them
+        // into a chunk. The second part merges stored chunks into fewer chunks, and it continues until the number
+        // of merged chunks is fewer than the default fanout.
+        //
+        // The fact that the code reach here means that the first chunk has been just stored.
+        // That is, the progress was divided into two parts.
+        // So, it multiply the progress of the children operator and 0.5f.
+        progress = child.getProgress() * 0.5f;
       }
     }
 
@@ -222,6 +245,11 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
+    // get total loaded (or stored) bytes and total row numbers
+    TableStats childTableStats = child.getInputStats();
+    if (childTableStats != null) {
+      sortAndStoredBytes = childTableStats.getNumBytes();
+    }
     return chunkPaths;
   }
 
@@ -266,6 +294,9 @@ public class ExternalSortExec extends SortExec {
 
       sorted = true;
       result.init();
+
+      // if loaded and sorted, we assume that it proceeds the half of one entire external sort operation.
+      progress = 0.5f;
     }
 
     return result.next();
@@ -294,11 +325,13 @@ public class ExternalSortExec extends SortExec {
     return computedFanout;
   }
 
-  private Scanner externalMergeAndSort(List<Path> chunks) throws IOException, ExecutionException, InterruptedException {
+  private Scanner externalMergeAndSort(List<Path> chunks)
+      throws IOException, ExecutionException, InterruptedException {
     int level = 0;
     final List<Path> inputFiles = TUtil.newList(chunks);
     final List<Path> outputFiles = TUtil.newList();
     int remainRun = inputFiles.size();
+    int chunksSize = chunks.size();
 
     long mergeStart = System.currentTimeMillis();
 
@@ -310,13 +343,18 @@ public class ExternalSortExec extends SortExec {
       int outChunkId = 0;
       int outputFileNum = 0;
       List<Future> futures = TUtil.newList();
+      // the number of files being merged in threads.
+      List<Integer> numberOfMergingFiles = TUtil.newList();
 
       for (int startIdx = 0; startIdx < inputFiles.size();) {
 
         // calculate proper fanout
         int fanout = calculateFanout(remainInputRuns, inputFiles.size(), outputFileNum, startIdx);
+        // how many files are merged in ith thread?
+        numberOfMergingFiles.add(fanout);
         // launch a merger runner
-        futures.add(executorService.submit(new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout)));
+        futures.add(executorService.submit(
+            new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false)));
         outputFileNum++;
 
         startIdx += fanout;
@@ -340,8 +378,14 @@ public class ExternalSortExec extends SortExec {
       }
 
       // wait for all sort runners
+      int finishedMerger = 0;
+      int index = 0;
       for (Future<Path> future : futures) {
         outputFiles.add(future.get());
+        // Getting the number of merged files
+        finishedMerger += numberOfMergingFiles.get(index++);
+        // progress = (# number of merged files / total number of files) * 0.5;
+        progress = ((float)finishedMerger/(float)chunksSize) * 0.5f;
       }
 
       // delete merged intermediate files
@@ -363,6 +407,7 @@ public class ExternalSortExec extends SortExec {
 
     // final result
     finalOutputFiles = inputFiles;
+
     result = createFinalMerger(inputFiles);
     return result;
   }
@@ -376,14 +421,16 @@ public class ExternalSortExec extends SortExec {
     final List<Path> inputFiles;
     final int startIdx;
     final int mergeFanout;
+    final boolean updateInputStats;
 
     public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
-                            final int startIdx, final int mergeFanout) {
+                            final int startIdx, final int mergeFanout, final boolean updateInputStats) {
       this.level = level;
       this.nextRunId = nextRunId;
       this.inputFiles = inputFiles;
       this.startIdx = startIdx;
       this.mergeFanout = mergeFanout;
+      this.updateInputStats = updateInputStats;
     }
 
     @Override
@@ -456,14 +503,31 @@ public class ExternalSortExec extends SortExec {
   private class MemTableScanner implements Scanner {
     Iterator<Tuple> iterator;
 
+    // for input stats
+    float scannerProgress;
+    int numRecords;
+    int totalRecords;
+    TableStats scannerTableStats;
+
     @Override
     public void init() throws IOException {
       iterator = inMemoryTable.iterator();
+
+      totalRecords = inMemoryTable.size();
+      scannerProgress = 0.0f;
+      numRecords = 0;
+
+      // it will be returned as the final stats
+      scannerTableStats = new TableStats();
+      scannerTableStats.setNumBytes(sortAndStoredBytes);
+      scannerTableStats.setReadBytes(sortAndStoredBytes);
+      scannerTableStats.setNumRows(totalRecords);
     }
 
     @Override
     public Tuple next() throws IOException {
       if (iterator.hasNext()) {
+        numRecords++;
         return iterator.next();
       } else {
         return null;
@@ -478,6 +542,7 @@ public class ExternalSortExec extends SortExec {
     @Override
     public void close() throws IOException {
       iterator = null;
+      scannerProgress = 1.0f;
     }
 
     @Override
@@ -507,6 +572,21 @@ public class ExternalSortExec extends SortExec {
     public Schema getSchema() {
       return null;
     }
+
+    @Override
+    public float getProgress() {
+      if (iterator != null && numRecords > 0) {
+        return (float)numRecords / (float)totalRecords;
+
+      } else { // if an input is empty
+        return scannerProgress;
+      }
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      return scannerTableStats;
+    }
   }
 
   /**
@@ -521,6 +601,9 @@ public class ExternalSortExec extends SortExec {
 
     private final Comparator<Tuple> comparator = getComparator();
 
+    private float mergerProgress;
+    private TableStats mergerInputStats;
+
     public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws IOException {
       this.leftScan = leftScanner;
       this.rightScan = rightScanner;
@@ -533,6 +616,9 @@ public class ExternalSortExec extends SortExec {
 
       leftTuple = leftScan.next();
       rightTuple = rightScan.next();
+
+      mergerInputStats = new TableStats();
+      mergerProgress = 0.0f;
     }
 
     public Tuple next() throws IOException {
@@ -565,11 +651,12 @@ public class ExternalSortExec extends SortExec {
       init();
     }
 
-    @Override
     public void close() throws IOException {
       IOUtils.cleanup(LOG, leftScan, rightScan);
+      getInputStats();
       leftScan = null;
       rightScan = null;
+      mergerProgress = 1.0f;
     }
 
     @Override
@@ -599,12 +686,51 @@ public class ExternalSortExec extends SortExec {
     public Schema getSchema() {
       return inSchema;
     }
+
+    @Override
+    public float getProgress() {
+      if (leftScan == null) {
+        return mergerProgress;
+      }
+      return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      if (leftScan == null) {
+        return mergerInputStats;
+      }
+      TableStats leftInputStats = leftScan.getInputStats();
+      mergerInputStats.setNumBytes(0);
+      mergerInputStats.setReadBytes(0);
+      mergerInputStats.setNumRows(0);
+
+      if (leftInputStats != null) {
+        mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
+        mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
+        mergerInputStats.setNumRows(leftInputStats.getNumRows());
+      }
+
+      TableStats rightInputStats = rightScan.getInputStats();
+      if (rightInputStats != null) {
+        mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes());
+        mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes());
+        mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows());
+      }
+
+      return mergerInputStats;
+    }
   }
 
   @Override
   public void close() throws IOException {
     if (result != null) {
       result.close();
+      try {
+        inputStats = (TableStats)result.getInputStats().clone();
+      } catch (CloneNotSupportedException e) {
+        LOG.warn(e.getMessage());
+      }
       result = null;
     }
 
@@ -632,6 +758,25 @@ public class ExternalSortExec extends SortExec {
   public void rescan() throws IOException {
     if (result != null) {
       result.reset();
+      progress = 0.5f;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    if (result != null) {
+      return progress + result.getProgress() * 0.5f;
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (result != null) {
+      return result.getInputStats();
+    } else {
+      return inputStats;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index e2b926d..678b745 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -154,5 +154,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
 
     partitioner = null;
     plan = null;
+
+    progress = 1.0f;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index a39f4be..7f86ba2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.collect.Lists;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.AbstractStorageManager;
@@ -45,6 +46,9 @@ public class PartitionMergeScanExec extends PhysicalExec {
 
   private AbstractStorageManager sm;
 
+  private float progress;
+  protected TableStats inputStats;
+
   public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
                                 ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -52,6 +56,8 @@ public class PartitionMergeScanExec extends PhysicalExec {
     this.plan = plan;
     this.fragments = fragments;
     this.sm = sm;
+
+    inputStats = new TableStats();
   }
 
   public void init() throws IOException {
@@ -59,6 +65,7 @@ public class PartitionMergeScanExec extends PhysicalExec {
       scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
           new CatalogProtos.FragmentProto[] {fragment}));
     }
+    progress = 0.0f;
     rescan();
   }
 
@@ -98,10 +105,39 @@ public class PartitionMergeScanExec extends PhysicalExec {
   public void close() throws IOException {
     for (SeqScanExec scanner : scanners) {
       scanner.close();
+      TableStats scannerTableStsts = scanner.getInputStats();
+      if (scannerTableStsts != null) {
+        inputStats.merge(scannerTableStsts);
+      }
     }
+    iterator = null;
+    progress = 1.0f;
   }
 
   public String getTableName() {
     return plan.getTableName();
   }
+
+  @Override
+  public float getProgress() {
+    if (iterator != null) {
+      float progressSum = 0.0f;
+      for (SeqScanExec scanner : scanners) {
+        progressSum += scanner.getProgress();
+      }
+      if (progressSum > 0) {
+        // get a average progress - divide progress summary by the number of scanners
+        return progressSum / (float)(scanners.size());
+      } else {
+        return 0.0f;
+      }
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return inputStats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 0b9bc95..e30a10b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -54,6 +55,8 @@ public abstract class PhysicalExec implements SchemaObject {
 
   public abstract void close() throws IOException;
 
+  public abstract float getProgress();
+
   protected void info(Log log, String message) {
     log.info("["+ context.getTaskId() + "] " + message);
   }
@@ -69,4 +72,8 @@ public abstract class PhysicalExec implements SchemaObject {
   protected Path getExecutorTmpDir() {
     return new Path(UUID.randomUUID().toString());
   }
+
+  public TableStats getInputStats() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c495470..a59cc2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -23,6 +23,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.ConstEval;
 import org.apache.tajo.engine.eval.EvalNode;
@@ -54,6 +55,8 @@ public class SeqScanExec extends PhysicalExec {
 
   private Projector projector;
 
+  private TableStats inputStats;
+
   public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
                      ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -189,6 +192,16 @@ public class SeqScanExec extends PhysicalExec {
   @Override
   public void close() throws IOException {
     IOUtils.cleanup(null, scanner);
+    if (scanner != null) {
+      try {
+        TableStats stat = scanner.getInputStats();
+        if (stat != null) {
+          inputStats = (TableStats)(stat.clone());
+        }
+      } catch (CloneNotSupportedException e) {
+        e.printStackTrace();
+      }
+    }
     scanner = null;
     plan = null;
     qual = null;
@@ -198,4 +211,22 @@ public class SeqScanExec extends PhysicalExec {
   public String getTableName() {
     return plan.getTableName();
   }
+
+  @Override
+  public float getProgress() {
+    if (scanner == null) {
+      return 1.0f;
+    } else {
+      return scanner.getProgress();
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (scanner != null) {
+      return scanner.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index ceeca06..ab67d7b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -20,12 +20,15 @@ package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public abstract class UnaryPhysicalExec extends PhysicalExec {
   protected PhysicalExec child;
+  protected float progress;
+  protected TableStats inputStats;
 
   public UnaryPhysicalExec(TaskAttemptContext context,
                            Schema inSchema, Schema outSchema,
@@ -44,21 +47,50 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
   }
 
   public void init() throws IOException {
+    progress = 0.0f;
     if (child != null) {
       child.init();
     }
   }
 
   public void rescan() throws IOException {
+    progress = 0.0f;
     if (child != null) {
       child.rescan();
     }
   }
 
   public void close() throws IOException {
+    progress = 1.0f;
     if (child != null) {
       child.close();
+      try {
+        TableStats stat = child.getInputStats();
+        if (stat != null) {
+          inputStats = (TableStats)(stat.clone());
+        }
+      } catch (CloneNotSupportedException e) {
+        e.printStackTrace();
+      }
       child = null;
     }
   }
+
+  @Override
+  public float getProgress() {
+    if (child != null) {
+      return child.getProgress();
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (child != null) {
+      return child.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index 6c187b6..320a5aa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -26,6 +26,10 @@ import org.apache.tajo.master.event.TaskSchedulerEvent;
 
 public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
 
+  protected int hostLocalAssigned;
+  protected int rackLocalAssigned;
+  protected int totalAssigned;
+
   /**
    * Construct the service.
    *
@@ -35,6 +39,18 @@ public abstract class AbstractTaskScheduler extends AbstractService implements E
     super(name);
   }
 
+  public int getHostLocalAssigned() {
+    return hostLocalAssigned;
+  }
+
+  public int getRackLocalAssigned() {
+    return rackLocalAssigned;
+  }
+
+  public int getTotalAssigned() {
+    return totalAssigned;
+  }
+
   public abstract void handleTaskRequestEvent(TaskRequestEvent event);
   public abstract int remainingScheduledObjectNum();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index cd18e10..3ee93ac 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -64,9 +64,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private ScheduledRequests scheduledRequests;
   private TaskRequests taskRequests;
 
-  private int hostLocalAssigned = 0;
-  private int rackLocalAssigned = 0;
-  private int totalAssigned = 0;
   private int nextTaskId = 0;
   private int scheduledObjectNum = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index e326128..7f1eac6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -54,8 +54,10 @@ public class TajoContainerProxy extends ContainerProxy {
     this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
     this.state = ContainerState.RUNNING;
 
-    LOG.info("Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
-        container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
+          container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+    }
 
     assignExecutionBlock(executionBlockId, container);
   }
@@ -109,7 +111,9 @@ public class TajoContainerProxy extends ContainerProxy {
 
   @Override
   public synchronized void stopContainer() {
-    LOG.info("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+    }
     if(isCompletelyDone()) {
       LOG.info("Container already stopped:" + containerID);
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 3350447..60f705f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -114,7 +114,6 @@ public class TajoMasterClientService extends AbstractService {
   public int getHttpPort() {
     return 0;
   }
-
   /////////////////////////////////////////////////////////////////////////////
   // TajoMasterClientProtocolService
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 5fafe51..02ed34e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -438,7 +438,7 @@ public class Query implements EventHandler<QueryEvent> {
                           Path finalOutputDir) throws Exception {
         SubQuery lastStage = query.getSubQuery(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
-        TableStats stats = lastStage.getTableStat();
+        TableStats stats = lastStage.getResultStats();
 
         TableDesc resultTableDesc =
             new TableDesc(
@@ -469,7 +469,7 @@ public class Query implements EventHandler<QueryEvent> {
         CatalogService catalog = context.getWorkerContext().getCatalog();
         SubQuery lastStage = query.getSubQuery(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
-        TableStats stats = lastStage.getTableStat();
+        TableStats stats = lastStage.getResultStats();
 
         CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
 
@@ -510,7 +510,7 @@ public class Query implements EventHandler<QueryEvent> {
         CatalogService catalog = context.getWorkerContext().getCatalog();
         SubQuery lastStage = query.getSubQuery(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
-        TableStats stats = lastStage.getTableStat();
+        TableStats stats = lastStage.getResultStats();
 
         InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 2e4bd70..57b3db4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -407,6 +408,16 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return finishTime;
   }
 
+  @VisibleForTesting
+  public void setLaunchTime(long launchTime) {
+    this.launchTime = launchTime;
+  }
+
+  @VisibleForTesting
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
   public long getRunningTime() {
     if(finishTime > 0) {
       return finishTime - launchTime;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 8a68c26..24fc6ad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -25,13 +25,13 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.util.TajoIdUtils;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -64,6 +64,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
 
   private final QueryUnitAttemptScheduleContext scheduleContext;
 
+  private float progress;
+  private CatalogProtos.TableStatsProto inputStats;
+  private CatalogProtos.TableStatsProto resultStats;
+
   protected static final StateMachineFactory
       <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
       stateMachineFactory = new StateMachineFactory
@@ -235,7 +239,28 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     return this.expire;
   }
 
+  public float getProgress() {
+    return progress;
+  }
+
+  public TableStats getInputStats() {
+    if (inputStats == null) {
+      return null;
+    }
+
+    return new TableStats(inputStats);
+  }
+
+  public TableStats getResultStats() {
+    if (resultStats == null) {
+      return null;
+    }
+    return new TableStats(resultStats);
+  }
+
   private void fillTaskStatistics(TaskCompletionReport report) {
+    this.progress = 1.0f;
+
     if (report.getShuffleFileOutputsCount() > 0) {
       this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
 
@@ -247,8 +272,12 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       }
       this.getQueryUnit().setIntermediateData(partitions);
     }
+    if (report.hasInputStats()) {
+      this.inputStats = report.getInputStats();
+    }
     if (report.hasResultStats()) {
-      this.getQueryUnit().setStats(new TableStats(report.getResultStats()));
+      this.resultStats = report.getResultStats();
+      this.getQueryUnit().setStats(new TableStats(resultStats));
     }
   }
 
@@ -309,6 +338,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
                                        TaskAttemptEvent event) {
       TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
 
+      taskAttempt.progress = updateEvent.getStatus().getProgress();
+      taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
+      taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
+
       switch (updateEvent.getStatus().getState()) {
         case TA_PENDING:
         case TA_RUNNING:

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 0503706..7d7ecad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -92,7 +92,7 @@ public class Repartitioner {
         childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
 
         tablePath = storageManager.getTablePath(scans[i].getTableName());
-        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
+        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes();
         fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
       } else {
         tablePath = tableDesc.getPath();
@@ -309,7 +309,7 @@ public class Repartitioner {
     List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId);
     for (ExecutionBlock childBlock : childBlocks) {
       SubQuery childExecSM = context.getSubQuery(childBlock.getId());
-      tableStatses.add(childExecSM.getTableStat());
+      tableStatses.add(childExecSM.getResultStats());
     }
     return StatisticsUtil.aggregateTableStat(tableStatses);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 7e1a9bd..790d30b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -81,7 +81,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private int priority;
   private Schema schema;
   private TableMeta meta;
-  private TableStats statistics;
+  private TableStats resultStatistics;
+  private TableStats inputStatistics;
   private EventHandler<Event> eventHandler;
   private final AbstractStorageManager sm;
   private AbstractTaskScheduler taskScheduler;
@@ -304,7 +305,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return this.finishTime;
   }
 
-  public float getProgress() {
+  public float getTaskProgress() {
     readLock.lock();
     try {
       if (getState() == SubQueryState.NEW) {
@@ -317,6 +318,29 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
+  public float getProgress() {
+    List<QueryUnit> tempTasks = null;
+    readLock.lock();
+    try {
+      if (getState() == SubQueryState.NEW) {
+        return 0;
+      } else {
+        tempTasks = new ArrayList<QueryUnit>(tasks.values());
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    float totalProgress = 0.0f;
+    for (QueryUnit eachQueryUnit: tempTasks) {
+      if (eachQueryUnit.getLastAttempt() != null) {
+        totalProgress += eachQueryUnit.getLastAttempt().getProgress();
+      }
+    }
+
+    return totalProgress/(float)tempTasks.size();
+  }
+
   public int getSucceededObjectCount() {
     return succeededObjectCount;
   }
@@ -397,8 +421,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return meta;
   }
 
-  public TableStats getTableStat() {
-    return statistics;
+  public TableStats getResultStats() {
+    return resultStatistics;
+  }
+
+  public TableStats getInputStats() {
+    return inputStatistics;
   }
 
   public List<String> getDiagnostics() {
@@ -447,11 +475,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  private static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
-    TableStats stat = new TableStats();
-    TableStats childStat;
-    long avgRows = 0, numBytes = 0, numRows = 0;
-    int numBlocks = 0, numOutputs = 0;
+  public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) {
+    TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
+    long[] avgRows = new long[]{0, 0};
+    long[] numBytes = new long[]{0, 0};
+    long[] readBytes = new long[]{0, 0};
+    long[] numRows = new long[]{0, 0};
+    int[] numBlocks = new int[]{0, 0};
+    int[] numOutputs = new int[]{0, 0};
+
     List<ColumnStats> columnStatses = Lists.newArrayList();
 
     MasterPlan masterPlan = subQuery.getMasterPlan();
@@ -459,31 +491,48 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     while (it.hasNext()) {
       ExecutionBlock block = it.next();
       SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
-      childStat = childSubQuery.getTableStat();
-      avgRows += childStat.getAvgRows();
-      columnStatses.addAll(childStat.getColumnStats());
-      numBlocks += childStat.getNumBlocks();
-      numBytes += childStat.getNumBytes();
-      numOutputs += childStat.getNumShuffleOutputs();
-      numRows += childStat.getNumRows();
+      TableStats[] childStatArray = new TableStats[]{
+          childSubQuery.getInputStats(), childSubQuery.getResultStats()
+      };
+      for (int i = 0; i < 2; i++) {
+        if (childStatArray[i] == null) {
+          continue;
+        }
+        avgRows[i] += childStatArray[i].getAvgRows();
+        numBlocks[i] += childStatArray[i].getNumBlocks();
+        numBytes[i] += childStatArray[i].getNumBytes();
+        readBytes[i] += childStatArray[i].getReadBytes();
+        numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
+        numRows[i] += childStatArray[i].getNumRows();
+      }
+      columnStatses.addAll(childStatArray[1].getColumnStats());
     }
 
-    stat.setColumnStats(columnStatses);
-    stat.setNumBlocks(numBlocks);
-    stat.setNumBytes(numBytes);
-    stat.setNumShuffleOutputs(numOutputs);
-    stat.setNumRows(numRows);
-    stat.setAvgRows(avgRows);
+    for (int i = 0; i < 2; i++) {
+      stat[i].setNumBlocks(numBlocks[i]);
+      stat[i].setNumBytes(numBytes[i]);
+      stat[i].setReadBytes(readBytes[i]);
+      stat[i].setNumShuffleOutputs(numOutputs[i]);
+      stat[i].setNumRows(numRows[i]);
+      stat[i].setAvgRows(avgRows[i]);
+    }
+    stat[1].setColumnStats(columnStatses);
+
     return stat;
   }
 
-  private TableStats computeStatFromTasks() {
-    List<TableStats> stats = Lists.newArrayList();
+  private TableStats[] computeStatFromTasks() {
+    List<TableStats> inputStatsList = Lists.newArrayList();
+    List<TableStats> resultStatsList = Lists.newArrayList();
     for (QueryUnit unit : getQueryUnits()) {
-      stats.add(unit.getStats());
+      resultStatsList.add(unit.getStats());
+      if (unit.getLastAttempt().getInputStats() != null) {
+        inputStatsList.add(unit.getLastAttempt().getInputStats());
+      }
     }
-    TableStats tableStats = StatisticsUtil.aggregateTableStat(stats);
-    return tableStats;
+    TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
+    TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
+    return new TableStats[]{inputStats, resultStats};
   }
 
   private void stopScheduler() {
@@ -503,11 +552,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
    * It computes all stats and sets the intermediate result.
    */
   private void finalizeStats() {
-    TableStats stats;
+    TableStats[] statsArray;
     if (block.hasUnion()) {
-      stats = computeStatFromUnionBlock(this);
+      statsArray = computeStatFromUnionBlock(this);
     } else {
-      stats = computeStatFromTasks();
+      statsArray = computeStatFromTasks();
     }
 
     DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
@@ -521,7 +570,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
     schema = channel.getSchema();
     meta = CatalogUtil.newTableMeta(storeType, new Options());
-    statistics = stats;
+    inputStatistics = statsArray[0];
+    resultStatistics = statsArray[1];
   }
 
   @Override
@@ -766,7 +816,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           if (subquery == null || subquery.getState() != SubQueryState.SUCCEEDED) {
             aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
           } else {
-            aggregatedVolume += subquery.getTableStat().getNumBytes();
+            aggregatedVolume += subquery.getResultStats().getNumBytes();
           }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
index 281290c..c73647e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -154,6 +154,8 @@ public class JSPUtil {
         } else if("runTime".equals(sortField)) {
           if(queryUnit2.getLaunchTime() == 0) {
             return -1;
+          } else if(queryUnit.getLaunchTime() == 0) {
+            return 1;
           }
           return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
         } else if("startTime".equals(sortField)) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 440887a..bcf10dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -126,6 +126,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       try {
         eachProxy.stopContainer();
       } catch (Exception e) {
+        LOG.warn(e.getMessage());
       }
     }
     super.stop();
@@ -167,7 +168,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     @Override
     public void run() {
       proxy.launch(null);
-      LOG.info("ContainerProxy started:" + id);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ContainerProxy started:" + id);
+      }
     }
   }
 
@@ -188,7 +191,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
     @Override
     public void run() {
-      LOG.info("ContainerProxy stopped:" + id + "," + proxy.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ContainerProxy stopped:" + id + "," + proxy.getId());
+      }
       proxy.stopContainer();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 4125236..56e5391 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.json.CoreGsonHelper;
@@ -67,6 +68,7 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
 public class Task {
   private static final Log LOG = LogFactory.getLog(Task.class);
+  private static final float FETCHER_PROGRESS = 0.5f;
 
   private final TajoConf systemConf;
   private final QueryContext queryContext;
@@ -87,7 +89,6 @@ public class Task {
   private boolean killed = false;
   private boolean aborted = false;
   private boolean stopped = false;
-  private float progress = 0;
   private final Reporter reporter;
   private Path inputTableBaseDir;
 
@@ -99,12 +100,7 @@ public class Task {
   private long startTime;
   private long finishTime;
 
-  /**
-   * flag that indicates whether progress update needs to be sent to parent.
-   * If true, it has been set. If false, it has been reset.
-   * Using AtomicBoolean since we need an atomic read & reset method.
-   */
-  private AtomicBoolean progressFlag = new AtomicBoolean(false);
+  private final TableStats inputStats;
 
   // TODO - to be refactored
   private ShuffleType shuffleType = null;
@@ -156,6 +152,7 @@ public class Task {
         request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
+    this.inputStats = new TableStats();
 
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
@@ -234,17 +231,6 @@ public class Task {
     return LOG;
   }
 
-  // getters and setters for flag
-  void setProgressFlag() {
-    progressFlag.set(true);
-  }
-  boolean resetProgressFlag() {
-    return progressFlag.getAndSet(false);
-  }
-  boolean getProgressFlag() {
-    return progressFlag.get();
-  }
-
   public void localize(QueryUnitRequest request) throws IOException {
     fetcherRunners = getFetchRunners(context, request.getFetches());
   }
@@ -263,7 +249,6 @@ public class Task {
 
   public void setState(TaskAttemptState status) {
     context.setState(status);
-    setProgressFlag();
   }
 
   public TaskAttemptContext getContext() {
@@ -283,7 +268,7 @@ public class Task {
   public void kill() {
     killed = true;
     context.stop();
-    setProgressFlag();
+    context.setState(TaskAttemptState.TA_KILLED);
     releaseChannelFactory();
   }
 
@@ -295,7 +280,6 @@ public class Task {
 
   public void cleanUp() {
     // remove itself from worker
-
     if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
       try {
         localFS.delete(context.getWorkDir(), true);
@@ -303,7 +287,7 @@ public class Task {
           taskRunnerContext.getTasks().remove(this.getId());
         }
       } catch (IOException e) {
-        e.printStackTrace();
+        LOG.error(e.getMessage(), e);
       }
     } else {
       LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
@@ -314,15 +298,38 @@ public class Task {
     TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
     builder.setWorkerName(taskRunnerContext.getNodeId());
     builder.setId(context.getTaskId().getProto())
-        .setProgress(context.getProgress()).setState(context.getState());
+        .setProgress(context.getProgress())
+        .setState(context.getState());
 
+    builder.setInputStats(reloadInputStats());
+
+    if (context.getResultStats() != null) {
+      builder.setResultStats(context.getResultStats().getProto());
+    }
     return builder.build();
   }
 
+  private CatalogProtos.TableStatsProto reloadInputStats() {
+    synchronized(inputStats) {
+      if (this.executor == null) {
+        return inputStats.getProto();
+      }
+
+      TableStats executorInputStats = this.executor.getInputStats();
+
+      if (executorInputStats != null) {
+        inputStats.setValues(executorInputStats);
+      }
+      return inputStats.getProto();
+    }
+  }
+
   private TaskCompletionReport getTaskCompletionReport() {
     TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
     builder.setId(context.getTaskId().getProto());
 
+    builder.setInputStats(reloadInputStats());
+
     if (context.hasResultStats()) {
       builder.setResultStats(context.getResultStats().getProto());
     } else {
@@ -359,12 +366,13 @@ public class Task {
     String errorMessage = null;
     try {
       context.setState(TaskAttemptState.TA_RUNNING);
-      setProgressFlag();
 
       if (context.hasFetchPhase()) {
         // If the fetch is still in progress, the query unit must wait for
         // complete.
         waitForFetch();
+        context.setFetcherProgress(FETCHER_PROGRESS);
+        context.setProgress(FETCHER_PROGRESS);
       }
 
       if (context.getFragmentSize() > 0) {
@@ -372,9 +380,9 @@ public class Task {
             createPlan(context, plan);
         this.executor.init();
         while(!killed && executor.next() != null) {
-          ++progress;
         }
         this.executor.close();
+        reloadInputStats();
         this.executor = null;
       }
     } catch (Exception e) {
@@ -383,11 +391,12 @@ public class Task {
       LOG.error(errorMessage);
       aborted = true;
     } finally {
-      setProgressFlag();
+      context.setProgress(1.0f);
       stopped = true;
       completedTasksNum++;
 
       if (killed || aborted) {
+        context.setExecutorProgress(0.0f);
         context.setProgress(0.0f);
         if(killed) {
           context.setState(TaskAttemptState.TA_KILLED);
@@ -466,6 +475,11 @@ public class Task {
       taskHistory.setStatus(getStatus().toString());
       taskHistory.setProgress(context.getProgress());
 
+      taskHistory.setInputStats(new TableStats(reloadInputStats()));
+      if (context.getResultStats() != null) {
+        taskHistory.setOutputStats((TableStats)context.getResultStats().clone());
+      }
+
       if (hasFetchPhase()) {
         Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
 
@@ -528,7 +542,7 @@ public class Task {
     return tablets;
   }
 
-  private static class FetchRunner implements Runnable {
+  private class FetchRunner implements Runnable {
     private final TaskAttemptContext ctx;
     private final Fetcher fetcher;
 
@@ -564,7 +578,7 @@ public class Task {
           retryNum++;
         }
       } finally {
-        ctx.getFetchLatch().countDown();
+        fetcherFinished(ctx);
       }
 
       if (retryNum == maxRetryNum) {
@@ -573,6 +587,24 @@ public class Task {
     }
   }
 
+  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+    int fetcherSize = fetcherRunners.size();
+    if(fetcherSize == 0) {
+      return;
+    }
+    try {
+      int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1;
+
+      if (numRunningFetcher == 0) {
+        context.setProgress(FETCHER_PROGRESS);
+      } else {
+        context.setProgress(((float)(fetcherSize - numRunningFetcher)) / numRunningFetcher * FETCHER_PROGRESS);
+      }
+    } finally {
+      ctx.getFetchLatch().countDown();
+    }
+  }
+
   private void releaseChannelFactory(){
     if(channelFactory != null) {
       channelFactory.shutdown();
@@ -637,19 +669,20 @@ public class Task {
         public void run() {
           while (!stop.get() && !stopped) {
             try {
+              if(executor != null && context.getProgress() < 1.0f) {
+                float progress = executor.getProgress();
+                context.setExecutorProgress(progress);
+              }
+            } catch (Throwable t) {
+              LOG.error("Get progress error: " + t.getMessage(), t);
+            }
 
-              resetProgressFlag();
-
-              if (getProgressFlag()) {
-                resetProgressFlag();
+            try {
+              if (context.isPorgressChanged()) {
                 masterStub.statusUpdate(null, getReport(), NullCallback.get());
               } else {
                 masterStub.ping(null, taskId.getProto(), NullCallback.get());
               }
-              synchronized (pingThread) {
-                pingThread.wait(PROGRESS_INTERVAL);
-              }
-
             } catch (Throwable t) {
               LOG.error(t.getMessage(), t);
               remainingRetries -=1;
@@ -658,6 +691,15 @@ public class Task {
                 LOG.warn("Last retry, exiting ");
                 throw new RuntimeException(t);
               }
+            } finally {
+              if (remainingRetries > 0) {
+                synchronized (pingThread) {
+                  try {
+                    pingThread.wait(PROGRESS_INTERVAL);
+                  } catch (InterruptedException e) {
+                  }
+                }
+              }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index c39c06e..92762c9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -37,6 +37,7 @@ import java.io.File;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
@@ -55,7 +56,10 @@ public class TaskAttemptContext {
   private final Path workDir;
   private boolean needFetch = false;
   private CountDownLatch doneFetchPhaseSignal;
-  private float progress = 0;
+  private float progress = 0.0f;
+  private float fetcherProgress = 0.0f;
+  private AtomicBoolean progressChanged = new AtomicBoolean(false);
+
   /** a map of shuffled file outputs */
   private Map<Integer, String> shuffleFileOutputs;
   private File fetchIn;
@@ -83,7 +87,7 @@ public class TaskAttemptContext {
 
     this.workDir = workDir;
     this.shuffleFileOutputs = Maps.newHashMap();
-    
+
     state = TaskAttemptState.TA_PENDING;
   }
 
@@ -133,7 +137,7 @@ public class TaskAttemptContext {
   public TableStats getResultStats() {
     return this.resultStats;
   }
-  
+
   public boolean isStopped() {
     return this.stopped;
   }
@@ -210,7 +214,21 @@ public class TaskAttemptContext {
   }
   
   public void setProgress(float progress) {
+    float previousProgress = this.progress;
     this.progress = progress;
+    progressChanged.set(previousProgress != progress);
+  }
+
+  public boolean isPorgressChanged() {
+    return progressChanged.get();
+  }
+  public void setExecutorProgress(float executorProgress) {
+    float adjustProgress = executorProgress * (1 - fetcherProgress);
+    setProgress(fetcherProgress + adjustProgress);
+  }
+
+  public void setFetcherProgress(float fetcherProgress) {
+    this.fetcherProgress = fetcherProgress;
   }
 
   public FragmentProto getTable(String id) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
index 2650c4a..0973aa7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.worker;
 
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.FileUtil;
+
 import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
@@ -31,6 +34,9 @@ public class TaskHistory {
   private String workingPath;
   private float progress;
 
+  private TableStats inputStats;
+  private TableStats outputStats;
+
   Map<URI, FetcherHistory> fetchers;
 
   public static class FetcherHistory {
@@ -150,4 +156,43 @@ public class TaskHistory {
   public boolean hasFetcher() {
     return fetchers != null && !fetchers.isEmpty();
   }
+
+  public TableStats getInputStats() {
+    return inputStats;
+  }
+
+  public void setInputStats(TableStats inputStats) {
+    this.inputStats = inputStats;
+  }
+
+  public TableStats getOutputStats() {
+    return outputStats;
+  }
+
+  public void setOutputStats(TableStats outputStats) {
+    this.outputStats = outputStats;
+  }
+
+  public static String toInputStatsString(TableStats tableStats) {
+    if (tableStats == null) {
+      return "No input statistics";
+    }
+
+    String result = "";
+    result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
+        + tableStats.getNumBytes() + " B)";
+    result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
+        + tableStats.getReadBytes() + " B)";
+    result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
+
+    return result;
+  }
+
+  public static String toOutputStatsString(TableStats tableStats) {
+    if (tableStats == null) {
+      return "No output statistics";
+    }
+
+    return tableStats.toJson();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9a38aef..d74e0df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -302,6 +302,9 @@ public class TaskRunner extends AbstractService {
 
   static void fatalError(QueryMasterProtocolService.Interface qmClientService,
                          QueryUnitAttemptId taskAttemptId, String message) {
+    if (message == null) {
+       message = "No error message";
+    }
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())
         .setErrorMessage(message);
@@ -372,6 +375,7 @@ public class TaskRunner extends AbstractService {
 
                   QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
                   if (tasks.containsKey(taskAttemptId)) {
+                    LOG.error("Duplicate Task Attempt: " + taskAttemptId);
                     fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
                     continue;
                   }
@@ -390,8 +394,8 @@ public class TaskRunner extends AbstractService {
                     // task.run() is a blocking call.
                     task.run();
                   } catch (Throwable t) {
+                    LOG.error(t.getMessage(), t);
                     fatalError(qmClientService, taskAttemptId, t.getMessage());
-                    t.printStackTrace();
                   } finally {
                     callFuture = null;
                     taskRequest = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 3fdd221..10e8ec2 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -34,15 +34,17 @@ message TaskStatusProto {
   required float progress = 3;
   required TaskAttemptState state = 4;
   optional StatSetProto stats = 5;
-  optional TableStatsProto resultStats = 6;
-  repeated ShuffleFileOutput shuffleFileOutputs = 7;
+  optional TableStatsProto inputStats = 6;
+  optional TableStatsProto resultStats = 7;
+  repeated ShuffleFileOutput shuffleFileOutputs = 8;
 }
 
 message TaskCompletionReport {
   required QueryUnitAttemptIdProto id = 1;
   optional StatSetProto stats = 2;
-  optional TableStatsProto resultStats = 3;
-  repeated ShuffleFileOutput shuffleFileOutputs = 4;
+  optional TableStatsProto inputStats = 3;
+  optional TableStatsProto resultStats = 4;
+  repeated ShuffleFileOutput shuffleFileOutputs = 5;
 }
 
 message TaskFatalErrorReport {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
index 5ba83ab..0a287ee 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
@@ -31,6 +31,11 @@
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.HashMap" %>
 <%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
+<%@ page import="java.util.Locale" %>
+<%@ page import="java.text.NumberFormat" %>
+<%@ page import="org.apache.tajo.engine.planner.PlannerUtil" %>
+<%@ page import="org.apache.tajo.util.FileUtil" %>
 
 <%
   String paramQueryId = request.getParameter("queryId");
@@ -92,9 +97,43 @@
 <%
     return;
   }
+
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
   String url = "querytasks.jsp?queryId=" + queryId + "&ebid=" + ebid + "&sortOrder=" + nextSortOrder + "&sort=";
+  QueryUnit[] queryUnits = subQuery.getQueryUnits();
+
+
+  long totalInputBytes = 0;
+  long totalReadBytes = 0;
+  long totalReadRows = 0;
+  long totalWriteBytes = 0;
+  long totalWriteRows = 0;
+  int numTasks = queryUnits.length;
+//  int numSucceededTasks = 0;
+//  int localReadTasks = subQuery.;
+  int numShuffles = 0;
+
+  float totalProgress = 0.0f;
+  for(QueryUnit eachQueryUnit: queryUnits) {
+    totalProgress += eachQueryUnit.getLastAttempt() != null ? eachQueryUnit.getLastAttempt().getProgress(): 0.0f;
+    numShuffles = eachQueryUnit.getShuffleOutpuNum();
+    if (eachQueryUnit.getLastAttempt() != null) {
+      TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats();
+      if (inputStats != null) {
+        totalInputBytes += inputStats.getNumBytes();
+        totalReadBytes += inputStats.getReadBytes();
+        totalReadRows += inputStats.getNumRows();
+      }
+      TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats();
+      if (outputStats != null) {
+        totalWriteBytes += outputStats.getNumBytes();
+        totalWriteRows += outputStats.getNumRows();
+      }
+    }
+  }
+
+    NumberFormat nf = NumberFormat.getInstance(Locale.US);
 %>
 
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
@@ -109,9 +148,26 @@
 <div class='contents'>
   <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
   <hr/>
-  <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a>(<%=subQuery.getState()%>)</h3>
-  <div>Started:<%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></div>
+  <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a></h3>
   <hr/>
+  <p/>
+  <pre><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre>
+  <p/>
+  <table border="1" width="100%" class="border_table">
+    <tr><td align='right' width='150px'>Status:</td><td><%=subQuery.getState()%></td></tr>
+    <tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr>
+    <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
+    <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float)(totalProgress/numTasks))%>%</td></tr>
+    <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
+    <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>
+    <tr><td align='right'>Read Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadRows) + " B)"%></td></tr>
+    <tr><td align='right'>Input Rows:</td><td><%=nf.format(totalReadRows)%></td></tr>
+    <tr><td align='right'>Output Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalWriteBytes, false) + " (" + nf.format(totalWriteBytes) + " B)"%></td></tr>
+    <tr><td align='right'>Output Rows:</td><td><%=nf.format(totalWriteRows)%></td></tr>
+  </table>
+  <hr/>
+
+
   <form action='querytasks.jsp' method='GET'>
   Status:
     <select name="status" onchange="this.form.submit()">
@@ -126,9 +182,8 @@
     <input type="hidden" name="sortOrder" value="<%=sortOrder%>"/>
   </form>
   <table border="1" width="100%" class="border_table">
-    <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th><a href='<%=url%>startTime'>Start Time</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
+    <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
     <%
-      QueryUnit[] queryUnits = subQuery.getQueryUnits();
       JSPUtil.sortQueryUnit(queryUnits, sort, sortOrder);
       int rowNo = 1;
       for(QueryUnit eachQueryUnit: queryUnits) {
@@ -158,8 +213,9 @@
       <td><%=rowNo%></td>
       <td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td>
       <td><%=eachQueryUnit.getState()%></td>
+      <td><%=JSPUtil.percentFormat(eachQueryUnit.getLastAttempt().getProgress())%>%</td>
       <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
-      <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
+      <td align='right'><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
       <td><%=queryUnitHost%></td>
     </tr>
     <%