You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/01 05:45:07 UTC
[2/2] git commit: TAJO-589: Add fine grained progress indicator for
each task. (hyoungjunkim via hyunsik)
TAJO-589: Add fine grained progress indicator for each task. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c573b6fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c573b6fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c573b6fc
Branch: refs/heads/master
Commit: c573b6fcf7ad588c4fd7c2baf5b9047c97117038
Parents: f594542
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Mar 1 13:44:43 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Mar 1 13:44:43 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../tajo/catalog/statistics/StatisticsUtil.java | 2 +
.../tajo/catalog/statistics/TableStats.java | 56 ++++-
.../src/main/proto/CatalogProtos.proto | 3 +-
.../main/java/org/apache/tajo/cli/TajoCli.java | 1 +
.../planner/physical/BSTIndexScanExec.java | 11 +-
.../planner/physical/BinaryPhysicalExec.java | 47 +++-
.../engine/planner/physical/EvalExprExec.java | 8 +
.../planner/physical/ExternalSortExec.java | 155 ++++++++++++-
.../physical/HashShuffleFileWriteExec.java | 2 +
.../physical/PartitionMergeScanExec.java | 36 +++
.../engine/planner/physical/PhysicalExec.java | 7 +
.../engine/planner/physical/SeqScanExec.java | 31 +++
.../planner/physical/UnaryPhysicalExec.java | 32 +++
.../tajo/master/AbstractTaskScheduler.java | 16 ++
.../tajo/master/DefaultTaskScheduler.java | 3 -
.../apache/tajo/master/TajoContainerProxy.java | 10 +-
.../tajo/master/TajoMasterClientService.java | 1 -
.../apache/tajo/master/querymaster/Query.java | 6 +-
.../tajo/master/querymaster/QueryUnit.java | 11 +
.../master/querymaster/QueryUnitAttempt.java | 37 ++-
.../tajo/master/querymaster/Repartitioner.java | 4 +-
.../tajo/master/querymaster/SubQuery.java | 114 +++++++---
.../main/java/org/apache/tajo/util/JSPUtil.java | 2 +
.../tajo/worker/TajoResourceAllocator.java | 9 +-
.../main/java/org/apache/tajo/worker/Task.java | 114 +++++++---
.../apache/tajo/worker/TaskAttemptContext.java | 24 +-
.../org/apache/tajo/worker/TaskHistory.java | 45 ++++
.../java/org/apache/tajo/worker/TaskRunner.java | 6 +-
.../src/main/proto/TajoWorkerProtocol.proto | 10 +-
.../resources/webapps/worker/querytasks.jsp | 66 +++++-
.../main/resources/webapps/worker/queryunit.jsp | 21 +-
.../resources/webapps/worker/taskdetail.jsp | 4 +-
.../java/org/apache/tajo/QueryTestCaseBase.java | 3 +
.../org/apache/tajo/TajoTestingCluster.java | 4 +
.../physical/TestProgressExternalSortExec.java | 226 +++++++++++++++++++
.../querymaster/TestQueryUnitStatusUpdate.java | 168 ++++++++++++++
.../java/org/apache/tajo/util/TestJSPUtil.java | 84 +++++++
.../queries/TestQueryUnitStatusUpdate/case1.sql | 1 +
.../queries/TestQueryUnitStatusUpdate/case2.sql | 5 +
.../queries/TestQueryUnitStatusUpdate/case3.sql | 11 +
.../java/org/apache/tajo/storage/CSVFile.java | 37 ++-
.../org/apache/tajo/storage/FileScanner.java | 32 ++-
.../org/apache/tajo/storage/MergeScanner.java | 45 ++++
.../java/org/apache/tajo/storage/RawFile.java | 36 +++
.../java/org/apache/tajo/storage/Scanner.java | 9 +
.../apache/tajo/storage/v2/FileScannerV2.java | 31 +++
47 files changed, 1469 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 75a1b74..bbf66a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,9 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-589: Add fine grained progress indicator for each task.
+ (hyoungjunkim via hyunsik)
+
TAJO-614: Explaning a logical node should use ExplainLogicalPlanVisitor.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
index 8593db6..01316bc 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java
@@ -81,6 +81,7 @@ public class StatisticsUtil {
result.setNumRows(result.getNumRows() + stats.getNumRows());
result.setNumBytes(result.getNumBytes() + stats.getNumBytes());
+ result.setReadBytes(result.getReadBytes() + stats.getReadBytes());
result.setNumBlocks(result.getNumBlocks() + stats.getNumBlocks());
result.setNumShuffleOutputs(result.getNumShuffleOutputs() + stats.getNumShuffleOutputs());
}
@@ -132,6 +133,7 @@ public class StatisticsUtil {
// aggregate table stats for each table
aggregated.setNumRows(aggregated.getNumRows() + ts.getNumRows());
aggregated.setNumBytes(aggregated.getNumBytes() + ts.getNumBytes());
+ aggregated.setReadBytes(aggregated.getReadBytes() + ts.getReadBytes());
aggregated.setNumBlocks(aggregated.getNumBlocks() + ts.getNumBlocks());
aggregated.setNumShuffleOutputs(aggregated.getNumShuffleOutputs() + ts.getNumShuffleOutputs());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
index 9a72da6..de2922e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java
@@ -43,6 +43,7 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
@Expose private Integer numBlocks = null; // optional
@Expose private Integer numShuffleOutputs = null; // optional
@Expose private Long avgRows = null; // optional
+ @Expose private Long readBytes = null; //optional
@Expose private List<ColumnStats> columnStatses = null; // repeated
public TableStats() {
@@ -51,6 +52,7 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
numBlocks = 0;
numShuffleOutputs = 0;
avgRows = 0l;
+ readBytes = 0l;
columnStatses = TUtil.newList();
}
@@ -73,6 +75,11 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
} else {
this.avgRows = 0l;
}
+ if (proto.hasReadBytes()) {
+ this.readBytes = proto.getReadBytes();
+ } else {
+ this.readBytes = 0l;
+ }
this.columnStatses = TUtil.newList();
for (CatalogProtos.ColumnStatsProto colProto : proto.getColStatList()) {
@@ -123,6 +130,14 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
this.avgRows = avgRows;
}
+ public Long getReadBytes() {
+ return readBytes;
+ }
+
+ public void setReadBytes(long readBytes) {
+ this.readBytes = readBytes;
+ }
+
public List<ColumnStats> getColumnStats() {
return this.columnStatses;
}
@@ -144,6 +159,7 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
eq = eq && TUtil.checkEquals(this.numBlocks, other.numBlocks);
eq = eq && TUtil.checkEquals(this.numShuffleOutputs, other.numShuffleOutputs);
eq = eq && TUtil.checkEquals(this.avgRows, other.avgRows);
+ eq = eq && TUtil.checkEquals(this.readBytes, other.readBytes);
eq = eq && TUtil.checkEquals(this.columnStatses, other.columnStatses);
return eq;
} else {
@@ -159,15 +175,44 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
public Object clone() throws CloneNotSupportedException {
TableStats stat = (TableStats) super.clone();
stat.builder = CatalogProtos.TableStatsProto.newBuilder();
- stat.numRows = numRows != null ? numRows.longValue() : null;
- stat.numBytes = numBytes != null ? numBytes.longValue() : null;
- stat.numBlocks = numBlocks != null ? numBlocks.intValue() : null;
- stat.numShuffleOutputs = numShuffleOutputs != null ? numShuffleOutputs.intValue() : null;
+ stat.numRows = numRows != null ? numRows : null;
+ stat.numBytes = numBytes != null ? numBytes : null;
+ stat.numBlocks = numBlocks != null ? numBlocks : null;
+ stat.numShuffleOutputs = numShuffleOutputs != null ? numShuffleOutputs : null;
+ stat.avgRows = avgRows != null ? avgRows : null;
+ stat.readBytes = readBytes != null ? readBytes : null;
+
stat.columnStatses = new ArrayList<ColumnStats>(this.columnStatses);
return stat;
}
+ public void merge(TableStats stat) {
+ if(stat == null) {
+ return;
+ }
+
+ numRows = stat.numRows != null ? stat.numRows + numRows : numRows;
+ numBytes = stat.numBytes != null ? stat.numBytes + numBytes : numBytes;
+ numBlocks = stat.numBlocks != null ? stat.numBlocks + numBlocks : numBlocks;
+ numShuffleOutputs = stat.numShuffleOutputs != null ? stat.numShuffleOutputs + numShuffleOutputs : numShuffleOutputs;
+ avgRows = stat.avgRows != null ? stat.avgRows + avgRows : avgRows;
+ readBytes = stat.readBytes != null ? stat.readBytes + readBytes : readBytes;
+ }
+
+ public void setValues(TableStats stat) {
+ if(stat == null) {
+ return;
+ }
+
+ numRows = stat.numRows != null ? stat.numRows : 0;
+ numBytes = stat.numBytes != null ? stat.numBytes : 0;
+ numBlocks = stat.numBlocks != null ? stat.numBlocks : 0;
+ numShuffleOutputs = stat.numShuffleOutputs != null ? stat.numShuffleOutputs : 0;
+ avgRows = stat.avgRows != null ? stat.avgRows : 0;
+ readBytes = stat.readBytes != null ? stat.readBytes : 0;
+ }
+
public String toString() {
Gson gson = CatalogGsonHelper.getPrettyInstance();
return gson.toJson(this);
@@ -198,6 +243,9 @@ public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, Gson
if (this.avgRows != null) {
builder.setAvgRows(this.avgRows);
}
+ if (this.readBytes != null) {
+ builder.setReadBytes(this.readBytes);
+ }
if (this.columnStatses != null) {
for (ColumnStats colStat : columnStatses) {
builder.addColStat(colStat.getProto());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index f5fff2c..35171cc 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -187,7 +187,8 @@ message TableStatsProto {
optional int32 numBlocks = 4;
optional int32 numShuffleOutputs = 5;
optional int64 avgRows = 6;
- repeated ColumnStatsProto colStat = 7;
+ optional int64 readBytes = 7;
+ repeated ColumnStatsProto colStat = 8;
}
message ColumnStatsProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index f107c51..3026d9c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -430,6 +430,7 @@ public class TajoCli {
sout.print("continue... ('q' is quit)");
sout.flush();
if (sin.read() == 'q') {
+ sout.println();
break;
}
numOfPrintedRows = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index d2f0922..35de707 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -44,7 +44,9 @@ public class BSTIndexScanExec extends PhysicalExec {
private Datum[] datum = null;
private boolean initialize = true;
-
+
+ private float progress;
+
public BSTIndexScanExec(TaskAttemptContext context,
AbstractStorageManager sm , ScanNode scanNode ,
FileFragment fragment, Path fileName , Schema keySchema,
@@ -66,7 +68,7 @@ public class BSTIndexScanExec extends PhysicalExec {
@Override
public void init() throws IOException {
-
+ progress = 0.0f;
}
@Override
@@ -133,4 +135,9 @@ public class BSTIndexScanExec extends PhysicalExec {
qual = null;
projector = null;
}
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index fc8d25d..35c8f6f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -18,14 +18,17 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
import java.io.IOException;
public abstract class BinaryPhysicalExec extends PhysicalExec {
protected PhysicalExec leftChild;
protected PhysicalExec rightChild;
+ protected float progress;
+ protected TableStats inputStats;
public BinaryPhysicalExec(final TaskAttemptContext context,
final Schema inSchema, final Schema outSchema,
@@ -47,6 +50,9 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
public void init() throws IOException {
leftChild.init();
rightChild.init();
+ progress = 0.0f;
+
+ inputStats = new TableStats();
}
@Override
@@ -59,7 +65,46 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
public void close() throws IOException {
leftChild.close();
rightChild.close();
+
+ getInputStats();
+
leftChild = null;
rightChild = null;
+
+ progress = 1.0f;
+ }
+
+ @Override
+ public float getProgress() {
+ if (leftChild == null) {
+ return progress;
+ }
+ return leftChild.getProgress() * 0.5f + rightChild.getProgress() * 0.5f;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftChild == null) {
+ return inputStats;
+ }
+ TableStats leftInputStats = leftChild.getInputStats();
+ inputStats.setNumBytes(0);
+ inputStats.setReadBytes(0);
+ inputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ inputStats.setNumBytes(leftInputStats.getNumBytes());
+ inputStats.setReadBytes(leftInputStats.getReadBytes());
+ inputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = rightChild.getInputStats();
+ if (rightInputStats != null) {
+ inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+ inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+ inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return inputStats;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 83580f9..a843bce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -28,6 +28,7 @@ import java.io.IOException;
public class EvalExprExec extends PhysicalExec {
private final EvalExprNode plan;
+ private float progress;
public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -36,6 +37,7 @@ public class EvalExprExec extends PhysicalExec {
@Override
public void init() throws IOException {
+ progress = 0.0f;
}
@Override
@@ -54,5 +56,11 @@ public class EvalExprExec extends PhysicalExec {
@Override
public void close() throws IOException {
+ progress = 1.0f;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 2dfbef4..4ceb3fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner.physical;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -30,6 +31,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.storage.*;
@@ -67,7 +69,7 @@ public class ExternalSortExec extends SortExec {
/** the defaultFanout of external sort */
private final int defaultFanout;
/** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
- private final int sortBufferBytesNum;
+ private int sortBufferBytesNum;
/** the number of available cores */
private final int allocatedCoreNum;
/** If there are available multiple cores, it tries parallel merge. */
@@ -94,6 +96,8 @@ public class ExternalSortExec extends SortExec {
private boolean memoryResident = true;
/** the final result */
private Scanner result;
+ /** total bytes of input data */
+ private long sortAndStoredBytes;
private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
throws PhysicalPlanningException {
@@ -136,7 +140,13 @@ public class ExternalSortExec extends SortExec {
setChild(child);
}
+ @VisibleForTesting
+ public void setSortBufferBytesNum(int sortBufferBytesNum) {
+ this.sortBufferBytesNum = sortBufferBytesNum;
+ }
+
public void init() throws IOException {
+ inputStats = new TableStats();
super.init();
}
@@ -205,6 +215,19 @@ public class ExternalSortExec extends SortExec {
memoryConsumption = 0;
chunkId++;
+
+ // When the volume of sorting data once exceed the size of sort buffer,
+ // the total progress of this external sort is divided into two parts.
+ // In contrast, if the data fits in memory, the progress is only one part.
+ //
+ // When the progress is divided into two parts, the first part sorts tuples on memory and stores them
+ // into a chunk. The second part merges stored chunks into fewer chunks, and it continues until the number
+ // of merged chunks is fewer than the default fanout.
+ //
+ // The fact that the code reach here means that the first chunk has been just stored.
+ // That is, the progress was divided into two parts.
+ // So, it multiply the progress of the children operator and 0.5f.
+ progress = child.getProgress() * 0.5f;
}
}
@@ -222,6 +245,11 @@ public class ExternalSortExec extends SortExec {
}
}
+ // get total loaded (or stored) bytes and total row numbers
+ TableStats childTableStats = child.getInputStats();
+ if (childTableStats != null) {
+ sortAndStoredBytes = childTableStats.getNumBytes();
+ }
return chunkPaths;
}
@@ -266,6 +294,9 @@ public class ExternalSortExec extends SortExec {
sorted = true;
result.init();
+
+ // if loaded and sorted, we assume that it proceeds the half of one entire external sort operation.
+ progress = 0.5f;
}
return result.next();
@@ -294,11 +325,13 @@ public class ExternalSortExec extends SortExec {
return computedFanout;
}
- private Scanner externalMergeAndSort(List<Path> chunks) throws IOException, ExecutionException, InterruptedException {
+ private Scanner externalMergeAndSort(List<Path> chunks)
+ throws IOException, ExecutionException, InterruptedException {
int level = 0;
final List<Path> inputFiles = TUtil.newList(chunks);
final List<Path> outputFiles = TUtil.newList();
int remainRun = inputFiles.size();
+ int chunksSize = chunks.size();
long mergeStart = System.currentTimeMillis();
@@ -310,13 +343,18 @@ public class ExternalSortExec extends SortExec {
int outChunkId = 0;
int outputFileNum = 0;
List<Future> futures = TUtil.newList();
+ // the number of files being merged in threads.
+ List<Integer> numberOfMergingFiles = TUtil.newList();
for (int startIdx = 0; startIdx < inputFiles.size();) {
// calculate proper fanout
int fanout = calculateFanout(remainInputRuns, inputFiles.size(), outputFileNum, startIdx);
+ // how many files are merged in ith thread?
+ numberOfMergingFiles.add(fanout);
// launch a merger runner
- futures.add(executorService.submit(new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout)));
+ futures.add(executorService.submit(
+ new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false)));
outputFileNum++;
startIdx += fanout;
@@ -340,8 +378,14 @@ public class ExternalSortExec extends SortExec {
}
// wait for all sort runners
+ int finishedMerger = 0;
+ int index = 0;
for (Future<Path> future : futures) {
outputFiles.add(future.get());
+ // Getting the number of merged files
+ finishedMerger += numberOfMergingFiles.get(index++);
+ // progress = (# number of merged files / total number of files) * 0.5;
+ progress = ((float)finishedMerger/(float)chunksSize) * 0.5f;
}
// delete merged intermediate files
@@ -363,6 +407,7 @@ public class ExternalSortExec extends SortExec {
// final result
finalOutputFiles = inputFiles;
+
result = createFinalMerger(inputFiles);
return result;
}
@@ -376,14 +421,16 @@ public class ExternalSortExec extends SortExec {
final List<Path> inputFiles;
final int startIdx;
final int mergeFanout;
+ final boolean updateInputStats;
public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
- final int startIdx, final int mergeFanout) {
+ final int startIdx, final int mergeFanout, final boolean updateInputStats) {
this.level = level;
this.nextRunId = nextRunId;
this.inputFiles = inputFiles;
this.startIdx = startIdx;
this.mergeFanout = mergeFanout;
+ this.updateInputStats = updateInputStats;
}
@Override
@@ -456,14 +503,31 @@ public class ExternalSortExec extends SortExec {
private class MemTableScanner implements Scanner {
Iterator<Tuple> iterator;
+ // for input stats
+ float scannerProgress;
+ int numRecords;
+ int totalRecords;
+ TableStats scannerTableStats;
+
@Override
public void init() throws IOException {
iterator = inMemoryTable.iterator();
+
+ totalRecords = inMemoryTable.size();
+ scannerProgress = 0.0f;
+ numRecords = 0;
+
+ // it will be returned as the final stats
+ scannerTableStats = new TableStats();
+ scannerTableStats.setNumBytes(sortAndStoredBytes);
+ scannerTableStats.setReadBytes(sortAndStoredBytes);
+ scannerTableStats.setNumRows(totalRecords);
}
@Override
public Tuple next() throws IOException {
if (iterator.hasNext()) {
+ numRecords++;
return iterator.next();
} else {
return null;
@@ -478,6 +542,7 @@ public class ExternalSortExec extends SortExec {
@Override
public void close() throws IOException {
iterator = null;
+ scannerProgress = 1.0f;
}
@Override
@@ -507,6 +572,21 @@ public class ExternalSortExec extends SortExec {
public Schema getSchema() {
return null;
}
+
+ @Override
+ public float getProgress() {
+ if (iterator != null && numRecords > 0) {
+ return (float)numRecords / (float)totalRecords;
+
+ } else { // if an input is empty
+ return scannerProgress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return scannerTableStats;
+ }
}
/**
@@ -521,6 +601,9 @@ public class ExternalSortExec extends SortExec {
private final Comparator<Tuple> comparator = getComparator();
+ private float mergerProgress;
+ private TableStats mergerInputStats;
+
public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws IOException {
this.leftScan = leftScanner;
this.rightScan = rightScanner;
@@ -533,6 +616,9 @@ public class ExternalSortExec extends SortExec {
leftTuple = leftScan.next();
rightTuple = rightScan.next();
+
+ mergerInputStats = new TableStats();
+ mergerProgress = 0.0f;
}
public Tuple next() throws IOException {
@@ -565,11 +651,12 @@ public class ExternalSortExec extends SortExec {
init();
}
- @Override
public void close() throws IOException {
IOUtils.cleanup(LOG, leftScan, rightScan);
+ getInputStats();
leftScan = null;
rightScan = null;
+ mergerProgress = 1.0f;
}
@Override
@@ -599,12 +686,51 @@ public class ExternalSortExec extends SortExec {
public Schema getSchema() {
return inSchema;
}
+
+ @Override
+ public float getProgress() {
+ if (leftScan == null) {
+ return mergerProgress;
+ }
+ return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftScan == null) {
+ return mergerInputStats;
+ }
+ TableStats leftInputStats = leftScan.getInputStats();
+ mergerInputStats.setNumBytes(0);
+ mergerInputStats.setReadBytes(0);
+ mergerInputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
+ mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
+ mergerInputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = rightScan.getInputStats();
+ if (rightInputStats != null) {
+ mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes());
+ mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes());
+ mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return mergerInputStats;
+ }
}
@Override
public void close() throws IOException {
if (result != null) {
result.close();
+ try {
+ inputStats = (TableStats)result.getInputStats().clone();
+ } catch (CloneNotSupportedException e) {
+ LOG.warn(e.getMessage());
+ }
result = null;
}
@@ -632,6 +758,25 @@ public class ExternalSortExec extends SortExec {
public void rescan() throws IOException {
if (result != null) {
result.reset();
+ progress = 0.5f;
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ if (result != null) {
+ return progress + result.getProgress() * 0.5f;
+ } else {
+ return progress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (result != null) {
+ return result.getInputStats();
+ } else {
+ return inputStats;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index e2b926d..678b745 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -154,5 +154,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
partitioner = null;
plan = null;
+
+ progress = 1.0f;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index a39f4be..7f86ba2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
import com.google.common.collect.Lists;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -45,6 +46,9 @@ public class PartitionMergeScanExec extends PhysicalExec {
private AbstractStorageManager sm;
+ private float progress;
+ protected TableStats inputStats;
+
public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -52,6 +56,8 @@ public class PartitionMergeScanExec extends PhysicalExec {
this.plan = plan;
this.fragments = fragments;
this.sm = sm;
+
+ inputStats = new TableStats();
}
public void init() throws IOException {
@@ -59,6 +65,7 @@ public class PartitionMergeScanExec extends PhysicalExec {
scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
new CatalogProtos.FragmentProto[] {fragment}));
}
+ progress = 0.0f;
rescan();
}
@@ -98,10 +105,39 @@ public class PartitionMergeScanExec extends PhysicalExec {
public void close() throws IOException {
for (SeqScanExec scanner : scanners) {
scanner.close();
+ TableStats scannerTableStsts = scanner.getInputStats();
+ if (scannerTableStsts != null) {
+ inputStats.merge(scannerTableStsts);
+ }
}
+ iterator = null;
+ progress = 1.0f;
}
public String getTableName() {
return plan.getTableName();
}
+
+ @Override
+ public float getProgress() {
+ if (iterator != null) {
+ float progressSum = 0.0f;
+ for (SeqScanExec scanner : scanners) {
+ progressSum += scanner.getProgress();
+ }
+ if (progressSum > 0) {
+ // get a average progress - divide progress summary by the number of scanners
+ return progressSum / (float)(scanners.size());
+ } else {
+ return 0.0f;
+ }
+ } else {
+ return progress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return inputStats;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 0b9bc95..e30a10b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -54,6 +55,8 @@ public abstract class PhysicalExec implements SchemaObject {
public abstract void close() throws IOException;
+ public abstract float getProgress();
+
protected void info(Log log, String message) {
log.info("["+ context.getTaskId() + "] " + message);
}
@@ -69,4 +72,8 @@ public abstract class PhysicalExec implements SchemaObject {
protected Path getExecutorTmpDir() {
return new Path(UUID.randomUUID().toString());
}
+
+ public TableStats getInputStats() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c495470..a59cc2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -23,6 +23,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.eval.ConstEval;
import org.apache.tajo.engine.eval.EvalNode;
@@ -54,6 +55,8 @@ public class SeqScanExec extends PhysicalExec {
private Projector projector;
+ private TableStats inputStats;
+
public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -189,6 +192,16 @@ public class SeqScanExec extends PhysicalExec {
@Override
public void close() throws IOException {
IOUtils.cleanup(null, scanner);
+ if (scanner != null) {
+ try {
+ TableStats stat = scanner.getInputStats();
+ if (stat != null) {
+ inputStats = (TableStats)(stat.clone());
+ }
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
+ }
scanner = null;
plan = null;
qual = null;
@@ -198,4 +211,22 @@ public class SeqScanExec extends PhysicalExec {
public String getTableName() {
return plan.getTableName();
}
+
+ @Override
+ public float getProgress() {
+ if (scanner == null) {
+ return 1.0f;
+ } else {
+ return scanner.getProgress();
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (scanner != null) {
+ return scanner.getInputStats();
+ } else {
+ return inputStats;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index ceeca06..ab67d7b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -20,12 +20,15 @@ package org.apache.tajo.engine.planner.physical;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public abstract class UnaryPhysicalExec extends PhysicalExec {
protected PhysicalExec child;
+ protected float progress;
+ protected TableStats inputStats;
public UnaryPhysicalExec(TaskAttemptContext context,
Schema inSchema, Schema outSchema,
@@ -44,21 +47,50 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
}
public void init() throws IOException {
+ progress = 0.0f;
if (child != null) {
child.init();
}
}
public void rescan() throws IOException {
+ progress = 0.0f;
if (child != null) {
child.rescan();
}
}
public void close() throws IOException {
+ progress = 1.0f;
if (child != null) {
child.close();
+ try {
+ TableStats stat = child.getInputStats();
+ if (stat != null) {
+ inputStats = (TableStats)(stat.clone());
+ }
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
child = null;
}
}
+
+ @Override
+ public float getProgress() {
+ if (child != null) {
+ return child.getProgress();
+ } else {
+ return progress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (child != null) {
+ return child.getInputStats();
+ } else {
+ return inputStats;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index 6c187b6..320a5aa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -26,6 +26,10 @@ import org.apache.tajo.master.event.TaskSchedulerEvent;
public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
+ protected int hostLocalAssigned;
+ protected int rackLocalAssigned;
+ protected int totalAssigned;
+
/**
* Construct the service.
*
@@ -35,6 +39,18 @@ public abstract class AbstractTaskScheduler extends AbstractService implements E
super(name);
}
+ public int getHostLocalAssigned() {
+ return hostLocalAssigned;
+ }
+
+ public int getRackLocalAssigned() {
+ return rackLocalAssigned;
+ }
+
+ public int getTotalAssigned() {
+ return totalAssigned;
+ }
+
public abstract void handleTaskRequestEvent(TaskRequestEvent event);
public abstract int remainingScheduledObjectNum();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index cd18e10..3ee93ac 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -64,9 +64,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
private ScheduledRequests scheduledRequests;
private TaskRequests taskRequests;
- private int hostLocalAssigned = 0;
- private int rackLocalAssigned = 0;
- private int totalAssigned = 0;
private int nextTaskId = 0;
private int scheduledObjectNum = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index e326128..7f1eac6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -54,8 +54,10 @@ public class TajoContainerProxy extends ContainerProxy {
this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
this.state = ContainerState.RUNNING;
- LOG.info("Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
- container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
+ container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+ }
assignExecutionBlock(executionBlockId, container);
}
@@ -109,7 +111,9 @@ public class TajoContainerProxy extends ContainerProxy {
@Override
public synchronized void stopContainer() {
- LOG.info("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+ }
if(isCompletelyDone()) {
LOG.info("Container already stopped:" + containerID);
return;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 3350447..60f705f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -114,7 +114,6 @@ public class TajoMasterClientService extends AbstractService {
public int getHttpPort() {
return 0;
}
-
/////////////////////////////////////////////////////////////////////////////
// TajoMasterClientProtocolService
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 5fafe51..02ed34e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -438,7 +438,7 @@ public class Query implements EventHandler<QueryEvent> {
Path finalOutputDir) throws Exception {
SubQuery lastStage = query.getSubQuery(finalExecBlockId);
TableMeta meta = lastStage.getTableMeta();
- TableStats stats = lastStage.getTableStat();
+ TableStats stats = lastStage.getResultStats();
TableDesc resultTableDesc =
new TableDesc(
@@ -469,7 +469,7 @@ public class Query implements EventHandler<QueryEvent> {
CatalogService catalog = context.getWorkerContext().getCatalog();
SubQuery lastStage = query.getSubQuery(finalExecBlockId);
TableMeta meta = lastStage.getTableMeta();
- TableStats stats = lastStage.getTableStat();
+ TableStats stats = lastStage.getResultStats();
CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
@@ -510,7 +510,7 @@ public class Query implements EventHandler<QueryEvent> {
CatalogService catalog = context.getWorkerContext().getCatalog();
SubQuery lastStage = query.getSubQuery(finalExecBlockId);
TableMeta meta = lastStage.getTableMeta();
- TableStats stats = lastStage.getTableStat();
+ TableStats stats = lastStage.getResultStats();
InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 2e4bd70..57b3db4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -18,6 +18,7 @@
package org.apache.tajo.master.querymaster;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -407,6 +408,16 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return finishTime;
}
+ @VisibleForTesting
+ public void setLaunchTime(long launchTime) {
+ this.launchTime = launchTime;
+ }
+
+ @VisibleForTesting
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
public long getRunningTime() {
if(finishTime > 0) {
return finishTime - launchTime;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 8a68c26..24fc6ad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -25,13 +25,13 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.util.TajoIdUtils;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -64,6 +64,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
private final QueryUnitAttemptScheduleContext scheduleContext;
+ private float progress;
+ private CatalogProtos.TableStatsProto inputStats;
+ private CatalogProtos.TableStatsProto resultStats;
+
protected static final StateMachineFactory
<QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory = new StateMachineFactory
@@ -235,7 +239,28 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
return this.expire;
}
+ public float getProgress() {
+ return progress;
+ }
+
+ public TableStats getInputStats() {
+ if (inputStats == null) {
+ return null;
+ }
+
+ return new TableStats(inputStats);
+ }
+
+ public TableStats getResultStats() {
+ if (resultStats == null) {
+ return null;
+ }
+ return new TableStats(resultStats);
+ }
+
private void fillTaskStatistics(TaskCompletionReport report) {
+ this.progress = 1.0f;
+
if (report.getShuffleFileOutputsCount() > 0) {
this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
@@ -247,8 +272,12 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
}
this.getQueryUnit().setIntermediateData(partitions);
}
+ if (report.hasInputStats()) {
+ this.inputStats = report.getInputStats();
+ }
if (report.hasResultStats()) {
- this.getQueryUnit().setStats(new TableStats(report.getResultStats()));
+ this.resultStats = report.getResultStats();
+ this.getQueryUnit().setStats(new TableStats(resultStats));
}
}
@@ -309,6 +338,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
TaskAttemptEvent event) {
TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
+ taskAttempt.progress = updateEvent.getStatus().getProgress();
+ taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
+ taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
+
switch (updateEvent.getStatus().getState()) {
case TA_PENDING:
case TA_RUNNING:
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 0503706..7d7ecad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -92,7 +92,7 @@ public class Repartitioner {
childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
tablePath = storageManager.getTablePath(scans[i].getTableName());
- stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
+ stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes();
fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
} else {
tablePath = tableDesc.getPath();
@@ -309,7 +309,7 @@ public class Repartitioner {
List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId);
for (ExecutionBlock childBlock : childBlocks) {
SubQuery childExecSM = context.getSubQuery(childBlock.getId());
- tableStatses.add(childExecSM.getTableStat());
+ tableStatses.add(childExecSM.getResultStats());
}
return StatisticsUtil.aggregateTableStat(tableStatses);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 7e1a9bd..790d30b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -81,7 +81,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int priority;
private Schema schema;
private TableMeta meta;
- private TableStats statistics;
+ private TableStats resultStatistics;
+ private TableStats inputStatistics;
private EventHandler<Event> eventHandler;
private final AbstractStorageManager sm;
private AbstractTaskScheduler taskScheduler;
@@ -304,7 +305,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return this.finishTime;
}
- public float getProgress() {
+ public float getTaskProgress() {
readLock.lock();
try {
if (getState() == SubQueryState.NEW) {
@@ -317,6 +318,29 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
+ public float getProgress() {
+ List<QueryUnit> tempTasks = null;
+ readLock.lock();
+ try {
+ if (getState() == SubQueryState.NEW) {
+ return 0;
+ } else {
+ tempTasks = new ArrayList<QueryUnit>(tasks.values());
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ float totalProgress = 0.0f;
+ for (QueryUnit eachQueryUnit: tempTasks) {
+ if (eachQueryUnit.getLastAttempt() != null) {
+ totalProgress += eachQueryUnit.getLastAttempt().getProgress();
+ }
+ }
+
+ return totalProgress/(float)tempTasks.size();
+ }
+
public int getSucceededObjectCount() {
return succeededObjectCount;
}
@@ -397,8 +421,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return meta;
}
- public TableStats getTableStat() {
- return statistics;
+ public TableStats getResultStats() {
+ return resultStatistics;
+ }
+
+ public TableStats getInputStats() {
+ return inputStatistics;
}
public List<String> getDiagnostics() {
@@ -447,11 +475,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
- private static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
- TableStats stat = new TableStats();
- TableStats childStat;
- long avgRows = 0, numBytes = 0, numRows = 0;
- int numBlocks = 0, numOutputs = 0;
+ public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) {
+ TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
+ long[] avgRows = new long[]{0, 0};
+ long[] numBytes = new long[]{0, 0};
+ long[] readBytes = new long[]{0, 0};
+ long[] numRows = new long[]{0, 0};
+ int[] numBlocks = new int[]{0, 0};
+ int[] numOutputs = new int[]{0, 0};
+
List<ColumnStats> columnStatses = Lists.newArrayList();
MasterPlan masterPlan = subQuery.getMasterPlan();
@@ -459,31 +491,48 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
while (it.hasNext()) {
ExecutionBlock block = it.next();
SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
- childStat = childSubQuery.getTableStat();
- avgRows += childStat.getAvgRows();
- columnStatses.addAll(childStat.getColumnStats());
- numBlocks += childStat.getNumBlocks();
- numBytes += childStat.getNumBytes();
- numOutputs += childStat.getNumShuffleOutputs();
- numRows += childStat.getNumRows();
+ TableStats[] childStatArray = new TableStats[]{
+ childSubQuery.getInputStats(), childSubQuery.getResultStats()
+ };
+ for (int i = 0; i < 2; i++) {
+ if (childStatArray[i] == null) {
+ continue;
+ }
+ avgRows[i] += childStatArray[i].getAvgRows();
+ numBlocks[i] += childStatArray[i].getNumBlocks();
+ numBytes[i] += childStatArray[i].getNumBytes();
+ readBytes[i] += childStatArray[i].getReadBytes();
+ numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
+ numRows[i] += childStatArray[i].getNumRows();
+ }
+ columnStatses.addAll(childStatArray[1].getColumnStats());
}
- stat.setColumnStats(columnStatses);
- stat.setNumBlocks(numBlocks);
- stat.setNumBytes(numBytes);
- stat.setNumShuffleOutputs(numOutputs);
- stat.setNumRows(numRows);
- stat.setAvgRows(avgRows);
+ for (int i = 0; i < 2; i++) {
+ stat[i].setNumBlocks(numBlocks[i]);
+ stat[i].setNumBytes(numBytes[i]);
+ stat[i].setReadBytes(readBytes[i]);
+ stat[i].setNumShuffleOutputs(numOutputs[i]);
+ stat[i].setNumRows(numRows[i]);
+ stat[i].setAvgRows(avgRows[i]);
+ }
+ stat[1].setColumnStats(columnStatses);
+
return stat;
}
- private TableStats computeStatFromTasks() {
- List<TableStats> stats = Lists.newArrayList();
+ private TableStats[] computeStatFromTasks() {
+ List<TableStats> inputStatsList = Lists.newArrayList();
+ List<TableStats> resultStatsList = Lists.newArrayList();
for (QueryUnit unit : getQueryUnits()) {
- stats.add(unit.getStats());
+ resultStatsList.add(unit.getStats());
+ if (unit.getLastAttempt().getInputStats() != null) {
+ inputStatsList.add(unit.getLastAttempt().getInputStats());
+ }
}
- TableStats tableStats = StatisticsUtil.aggregateTableStat(stats);
- return tableStats;
+ TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
+ TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
+ return new TableStats[]{inputStats, resultStats};
}
private void stopScheduler() {
@@ -503,11 +552,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
* It computes all stats and sets the intermediate result.
*/
private void finalizeStats() {
- TableStats stats;
+ TableStats[] statsArray;
if (block.hasUnion()) {
- stats = computeStatFromUnionBlock(this);
+ statsArray = computeStatFromUnionBlock(this);
} else {
- stats = computeStatFromTasks();
+ statsArray = computeStatFromTasks();
}
DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
@@ -521,7 +570,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
schema = channel.getSchema();
meta = CatalogUtil.newTableMeta(storeType, new Options());
- statistics = stats;
+ inputStatistics = statsArray[0];
+ resultStatistics = statsArray[1];
}
@Override
@@ -766,7 +816,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
if (subquery == null || subquery.getState() != SubQueryState.SUCCEEDED) {
aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
} else {
- aggregatedVolume += subquery.getTableStat().getNumBytes();
+ aggregatedVolume += subquery.getResultStats().getNumBytes();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
index 281290c..c73647e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -154,6 +154,8 @@ public class JSPUtil {
} else if("runTime".equals(sortField)) {
if(queryUnit2.getLaunchTime() == 0) {
return -1;
+ } else if(queryUnit.getLaunchTime() == 0) {
+ return 1;
}
return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
} else if("startTime".equals(sortField)) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 440887a..bcf10dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -126,6 +126,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
try {
eachProxy.stopContainer();
} catch (Exception e) {
+ LOG.warn(e.getMessage());
}
}
super.stop();
@@ -167,7 +168,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
proxy.launch(null);
- LOG.info("ContainerProxy started:" + id);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerProxy started:" + id);
+ }
}
}
@@ -188,7 +191,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
- LOG.info("ContainerProxy stopped:" + id + "," + proxy.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerProxy stopped:" + id + "," + proxy.getId());
+ }
proxy.stopContainer();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 4125236..56e5391 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.json.CoreGsonHelper;
@@ -67,6 +68,7 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
public class Task {
private static final Log LOG = LogFactory.getLog(Task.class);
+ private static final float FETCHER_PROGRESS = 0.5f;
private final TajoConf systemConf;
private final QueryContext queryContext;
@@ -87,7 +89,6 @@ public class Task {
private boolean killed = false;
private boolean aborted = false;
private boolean stopped = false;
- private float progress = 0;
private final Reporter reporter;
private Path inputTableBaseDir;
@@ -99,12 +100,7 @@ public class Task {
private long startTime;
private long finishTime;
- /**
- * flag that indicates whether progress update needs to be sent to parent.
- * If true, it has been set. If false, it has been reset.
- * Using AtomicBoolean since we need an atomic read & reset method.
- */
- private AtomicBoolean progressFlag = new AtomicBoolean(false);
+ private final TableStats inputStats;
// TODO - to be refactored
private ShuffleType shuffleType = null;
@@ -156,6 +152,7 @@ public class Task {
request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
this.context.setDataChannel(request.getDataChannel());
this.context.setEnforcer(request.getEnforcer());
+ this.inputStats = new TableStats();
plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
@@ -234,17 +231,6 @@ public class Task {
return LOG;
}
- // getters and setters for flag
- void setProgressFlag() {
- progressFlag.set(true);
- }
- boolean resetProgressFlag() {
- return progressFlag.getAndSet(false);
- }
- boolean getProgressFlag() {
- return progressFlag.get();
- }
-
public void localize(QueryUnitRequest request) throws IOException {
fetcherRunners = getFetchRunners(context, request.getFetches());
}
@@ -263,7 +249,6 @@ public class Task {
public void setState(TaskAttemptState status) {
context.setState(status);
- setProgressFlag();
}
public TaskAttemptContext getContext() {
@@ -283,7 +268,7 @@ public class Task {
public void kill() {
killed = true;
context.stop();
- setProgressFlag();
+ context.setState(TaskAttemptState.TA_KILLED);
releaseChannelFactory();
}
@@ -295,7 +280,6 @@ public class Task {
public void cleanUp() {
// remove itself from worker
-
if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
try {
localFS.delete(context.getWorkDir(), true);
@@ -303,7 +287,7 @@ public class Task {
taskRunnerContext.getTasks().remove(this.getId());
}
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
} else {
LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
@@ -314,15 +298,38 @@ public class Task {
TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
builder.setWorkerName(taskRunnerContext.getNodeId());
builder.setId(context.getTaskId().getProto())
- .setProgress(context.getProgress()).setState(context.getState());
+ .setProgress(context.getProgress())
+ .setState(context.getState());
+ builder.setInputStats(reloadInputStats());
+
+ if (context.getResultStats() != null) {
+ builder.setResultStats(context.getResultStats().getProto());
+ }
return builder.build();
}
+ private CatalogProtos.TableStatsProto reloadInputStats() {
+ synchronized(inputStats) {
+ if (this.executor == null) {
+ return inputStats.getProto();
+ }
+
+ TableStats executorInputStats = this.executor.getInputStats();
+
+ if (executorInputStats != null) {
+ inputStats.setValues(executorInputStats);
+ }
+ return inputStats.getProto();
+ }
+ }
+
private TaskCompletionReport getTaskCompletionReport() {
TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
builder.setId(context.getTaskId().getProto());
+ builder.setInputStats(reloadInputStats());
+
if (context.hasResultStats()) {
builder.setResultStats(context.getResultStats().getProto());
} else {
@@ -359,12 +366,13 @@ public class Task {
String errorMessage = null;
try {
context.setState(TaskAttemptState.TA_RUNNING);
- setProgressFlag();
if (context.hasFetchPhase()) {
// If the fetch is still in progress, the query unit must wait for
// complete.
waitForFetch();
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ context.setProgress(FETCHER_PROGRESS);
}
if (context.getFragmentSize() > 0) {
@@ -372,9 +380,9 @@ public class Task {
createPlan(context, plan);
this.executor.init();
while(!killed && executor.next() != null) {
- ++progress;
}
this.executor.close();
+ reloadInputStats();
this.executor = null;
}
} catch (Exception e) {
@@ -383,11 +391,12 @@ public class Task {
LOG.error(errorMessage);
aborted = true;
} finally {
- setProgressFlag();
+ context.setProgress(1.0f);
stopped = true;
completedTasksNum++;
if (killed || aborted) {
+ context.setExecutorProgress(0.0f);
context.setProgress(0.0f);
if(killed) {
context.setState(TaskAttemptState.TA_KILLED);
@@ -466,6 +475,11 @@ public class Task {
taskHistory.setStatus(getStatus().toString());
taskHistory.setProgress(context.getProgress());
+ taskHistory.setInputStats(new TableStats(reloadInputStats()));
+ if (context.getResultStats() != null) {
+ taskHistory.setOutputStats((TableStats)context.getResultStats().clone());
+ }
+
if (hasFetchPhase()) {
Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
@@ -528,7 +542,7 @@ public class Task {
return tablets;
}
- private static class FetchRunner implements Runnable {
+ private class FetchRunner implements Runnable {
private final TaskAttemptContext ctx;
private final Fetcher fetcher;
@@ -564,7 +578,7 @@ public class Task {
retryNum++;
}
} finally {
- ctx.getFetchLatch().countDown();
+ fetcherFinished(ctx);
}
if (retryNum == maxRetryNum) {
@@ -573,6 +587,24 @@ public class Task {
}
}
+ private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+ int fetcherSize = fetcherRunners.size();
+ if(fetcherSize == 0) {
+ return;
+ }
+ try {
+ int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1;
+
+ if (numRunningFetcher == 0) {
+ context.setProgress(FETCHER_PROGRESS);
+ } else {
+ context.setProgress(((float)(fetcherSize - numRunningFetcher)) / numRunningFetcher * FETCHER_PROGRESS);
+ }
+ } finally {
+ ctx.getFetchLatch().countDown();
+ }
+ }
+
private void releaseChannelFactory(){
if(channelFactory != null) {
channelFactory.shutdown();
@@ -637,19 +669,20 @@ public class Task {
public void run() {
while (!stop.get() && !stopped) {
try {
+ if(executor != null && context.getProgress() < 1.0f) {
+ float progress = executor.getProgress();
+ context.setExecutorProgress(progress);
+ }
+ } catch (Throwable t) {
+ LOG.error("Get progress error: " + t.getMessage(), t);
+ }
- resetProgressFlag();
-
- if (getProgressFlag()) {
- resetProgressFlag();
+ try {
+ if (context.isPorgressChanged()) {
masterStub.statusUpdate(null, getReport(), NullCallback.get());
} else {
masterStub.ping(null, taskId.getProto(), NullCallback.get());
}
- synchronized (pingThread) {
- pingThread.wait(PROGRESS_INTERVAL);
- }
-
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
remainingRetries -=1;
@@ -658,6 +691,15 @@ public class Task {
LOG.warn("Last retry, exiting ");
throw new RuntimeException(t);
}
+ } finally {
+ if (remainingRetries > 0) {
+ synchronized (pingThread) {
+ try {
+ pingThread.wait(PROGRESS_INTERVAL);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index c39c06e..92762c9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -37,6 +37,7 @@ import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@@ -55,7 +56,10 @@ public class TaskAttemptContext {
private final Path workDir;
private boolean needFetch = false;
private CountDownLatch doneFetchPhaseSignal;
- private float progress = 0;
+ private float progress = 0.0f;
+ private float fetcherProgress = 0.0f;
+ private AtomicBoolean progressChanged = new AtomicBoolean(false);
+
/** a map of shuffled file outputs */
private Map<Integer, String> shuffleFileOutputs;
private File fetchIn;
@@ -83,7 +87,7 @@ public class TaskAttemptContext {
this.workDir = workDir;
this.shuffleFileOutputs = Maps.newHashMap();
-
+
state = TaskAttemptState.TA_PENDING;
}
@@ -133,7 +137,7 @@ public class TaskAttemptContext {
public TableStats getResultStats() {
return this.resultStats;
}
-
+
public boolean isStopped() {
return this.stopped;
}
@@ -210,7 +214,21 @@ public class TaskAttemptContext {
}
public void setProgress(float progress) {
+ float previousProgress = this.progress;
this.progress = progress;
+ progressChanged.set(previousProgress != progress);
+ }
+
+ public boolean isPorgressChanged() {
+ return progressChanged.get();
+ }
+ public void setExecutorProgress(float executorProgress) {
+ float adjustProgress = executorProgress * (1 - fetcherProgress);
+ setProgress(fetcherProgress + adjustProgress);
+ }
+
+ public void setFetcherProgress(float fetcherProgress) {
+ this.fetcherProgress = fetcherProgress;
}
public FragmentProto getTable(String id) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
index 2650c4a..0973aa7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -18,6 +18,9 @@
package org.apache.tajo.worker;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.FileUtil;
+
import java.net.URI;
import java.util.Collection;
import java.util.Map;
@@ -31,6 +34,9 @@ public class TaskHistory {
private String workingPath;
private float progress;
+ private TableStats inputStats;
+ private TableStats outputStats;
+
Map<URI, FetcherHistory> fetchers;
public static class FetcherHistory {
@@ -150,4 +156,43 @@ public class TaskHistory {
public boolean hasFetcher() {
return fetchers != null && !fetchers.isEmpty();
}
+
+ public TableStats getInputStats() {
+ return inputStats;
+ }
+
+ public void setInputStats(TableStats inputStats) {
+ this.inputStats = inputStats;
+ }
+
+ public TableStats getOutputStats() {
+ return outputStats;
+ }
+
+ public void setOutputStats(TableStats outputStats) {
+ this.outputStats = outputStats;
+ }
+
+ public static String toInputStatsString(TableStats tableStats) {
+ if (tableStats == null) {
+ return "No input statistics";
+ }
+
+ String result = "";
+ result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
+ + tableStats.getNumBytes() + " B)";
+ result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
+ + tableStats.getReadBytes() + " B)";
+ result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
+
+ return result;
+ }
+
+ public static String toOutputStatsString(TableStats tableStats) {
+ if (tableStats == null) {
+ return "No output statistics";
+ }
+
+ return tableStats.toJson();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9a38aef..d74e0df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -302,6 +302,9 @@ public class TaskRunner extends AbstractService {
static void fatalError(QueryMasterProtocolService.Interface qmClientService,
QueryUnitAttemptId taskAttemptId, String message) {
+ if (message == null) {
+ message = "No error message";
+ }
TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
.setId(taskAttemptId.getProto())
.setErrorMessage(message);
@@ -372,6 +375,7 @@ public class TaskRunner extends AbstractService {
QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
if (tasks.containsKey(taskAttemptId)) {
+ LOG.error("Duplicate Task Attempt: " + taskAttemptId);
fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
continue;
}
@@ -390,8 +394,8 @@ public class TaskRunner extends AbstractService {
// task.run() is a blocking call.
task.run();
} catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
fatalError(qmClientService, taskAttemptId, t.getMessage());
- t.printStackTrace();
} finally {
callFuture = null;
taskRequest = null;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 3fdd221..10e8ec2 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -34,15 +34,17 @@ message TaskStatusProto {
required float progress = 3;
required TaskAttemptState state = 4;
optional StatSetProto stats = 5;
- optional TableStatsProto resultStats = 6;
- repeated ShuffleFileOutput shuffleFileOutputs = 7;
+ optional TableStatsProto inputStats = 6;
+ optional TableStatsProto resultStats = 7;
+ repeated ShuffleFileOutput shuffleFileOutputs = 8;
}
message TaskCompletionReport {
required QueryUnitAttemptIdProto id = 1;
optional StatSetProto stats = 2;
- optional TableStatsProto resultStats = 3;
- repeated ShuffleFileOutput shuffleFileOutputs = 4;
+ optional TableStatsProto inputStats = 3;
+ optional TableStatsProto resultStats = 4;
+ repeated ShuffleFileOutput shuffleFileOutputs = 5;
}
message TaskFatalErrorReport {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c573b6fc/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
index 5ba83ab..0a287ee 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
@@ -31,6 +31,11 @@
<%@ page import="java.util.Map" %>
<%@ page import="java.util.HashMap" %>
<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
+<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
+<%@ page import="java.util.Locale" %>
+<%@ page import="java.text.NumberFormat" %>
+<%@ page import="org.apache.tajo.engine.planner.PlannerUtil" %>
+<%@ page import="org.apache.tajo.util.FileUtil" %>
<%
String paramQueryId = request.getParameter("queryId");
@@ -92,9 +97,43 @@
<%
return;
}
+
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String url = "querytasks.jsp?queryId=" + queryId + "&ebid=" + ebid + "&sortOrder=" + nextSortOrder + "&sort=";
+ QueryUnit[] queryUnits = subQuery.getQueryUnits();
+
+
+ long totalInputBytes = 0;
+ long totalReadBytes = 0;
+ long totalReadRows = 0;
+ long totalWriteBytes = 0;
+ long totalWriteRows = 0;
+ int numTasks = queryUnits.length;
+// int numSucceededTasks = 0;
+// int localReadTasks = subQuery.;
+ int numShuffles = 0;
+
+ float totalProgress = 0.0f;
+ for(QueryUnit eachQueryUnit: queryUnits) {
+ totalProgress += eachQueryUnit.getLastAttempt() != null ? eachQueryUnit.getLastAttempt().getProgress(): 0.0f;
+ numShuffles = eachQueryUnit.getShuffleOutpuNum();
+ if (eachQueryUnit.getLastAttempt() != null) {
+ TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats();
+ if (inputStats != null) {
+ totalInputBytes += inputStats.getNumBytes();
+ totalReadBytes += inputStats.getReadBytes();
+ totalReadRows += inputStats.getNumRows();
+ }
+ TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats();
+ if (outputStats != null) {
+ totalWriteBytes += outputStats.getNumBytes();
+ totalWriteRows += outputStats.getNumRows();
+ }
+ }
+ }
+
+ NumberFormat nf = NumberFormat.getInstance(Locale.US);
%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
@@ -109,9 +148,26 @@
<div class='contents'>
<h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
<hr/>
- <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a>(<%=subQuery.getState()%>)</h3>
- <div>Started:<%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></div>
+ <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a></h3>
<hr/>
+ <p/>
+ <pre><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre>
+ <p/>
+ <table border="1" width="100%" class="border_table">
+ <tr><td align='right' width='150px'>Status:</td><td><%=subQuery.getState()%></td></tr>
+ <tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr>
+ <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
+ <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float)(totalProgress/numTasks))%>%</td></tr>
+ <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
+ <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>
+ <tr><td align='right'>Read Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadRows) + " B)"%></td></tr>
+ <tr><td align='right'>Input Rows:</td><td><%=nf.format(totalReadRows)%></td></tr>
+ <tr><td align='right'>Output Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalWriteBytes, false) + " (" + nf.format(totalWriteBytes) + " B)"%></td></tr>
+ <tr><td align='right'>Output Rows:</td><td><%=nf.format(totalWriteRows)%></td></tr>
+ </table>
+ <hr/>
+
+
<form action='querytasks.jsp' method='GET'>
Status:
<select name="status" onchange="this.form.submit()">
@@ -126,9 +182,8 @@
<input type="hidden" name="sortOrder" value="<%=sortOrder%>"/>
</form>
<table border="1" width="100%" class="border_table">
- <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th><a href='<%=url%>startTime'>Start Time</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
+ <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr>
<%
- QueryUnit[] queryUnits = subQuery.getQueryUnits();
JSPUtil.sortQueryUnit(queryUnits, sort, sortOrder);
int rowNo = 1;
for(QueryUnit eachQueryUnit: queryUnits) {
@@ -158,8 +213,9 @@
<td><%=rowNo%></td>
<td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td>
<td><%=eachQueryUnit.getState()%></td>
+ <td><%=JSPUtil.percentFormat(eachQueryUnit.getLastAttempt().getProgress())%>%</td>
<td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
- <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
+ <td align='right'><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td>
<td><%=queryUnitHost%></td>
</tr>
<%