You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/13 05:29:59 UTC
[4/4] git commit: TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
TAJO-178: Implements StorageManager for scanning asynchronously. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5ad7fbae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5ad7fbae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5ad7fbae
Branch: refs/heads/master
Commit: 5ad7fbae98e45307cde51bc58d54e4de95b627ad
Parents: 5d3966a
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Sep 13 11:45:21 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Sep 13 11:53:20 2013 +0900
----------------------------------------------------------------------
pom.xml | 1 +
.../src/main/proto/CatalogProtos.proto | 2 +
tajo-core/tajo-core-backend/pom.xml | 1 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 80 ++-
.../engine/planner/PhysicalPlannerImpl.java | 8 +-
.../planner/physical/BSTIndexScanExec.java | 4 +-
.../planner/physical/ExternalSortExec.java | 2 +-
.../planner/physical/IndexedStoreExec.java | 4 +-
.../planner/physical/PartitionedStoreExec.java | 9 +-
.../engine/planner/physical/ProjectionExec.java | 1 +
.../engine/planner/physical/SeqScanExec.java | 6 +-
.../engine/planner/physical/StoreTableExec.java | 13 +-
.../org/apache/tajo/master/GlobalEngine.java | 6 +-
.../org/apache/tajo/master/GlobalPlanner.java | 6 +-
.../java/org/apache/tajo/master/TajoMaster.java | 18 +-
.../apache/tajo/master/TaskSchedulerImpl.java | 21 +-
.../apache/tajo/master/querymaster/Query.java | 13 +-
.../master/querymaster/QueryInProgress.java | 2 +
.../master/querymaster/QueryJobManager.java | 10 +
.../tajo/master/querymaster/QueryMaster.java | 39 +-
.../master/querymaster/QueryMasterTask.java | 4 +-
.../tajo/master/querymaster/QueryUnit.java | 39 +-
.../tajo/master/querymaster/SubQuery.java | 8 +-
.../master/rm/TajoWorkerResourceManager.java | 31 +-
.../apache/tajo/master/rm/WorkerResource.java | 47 ++
.../java/org/apache/tajo/webapp/HttpServer.java | 15 +-
.../org/apache/tajo/worker/TajoQueryEngine.java | 9 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 53 +-
.../tajo/worker/TajoWorkerManagerService.java | 4 +-
.../main/java/org/apache/tajo/worker/Task.java | 1 -
.../apache/tajo/worker/TaskRunnerManager.java | 6 +
.../src/main/proto/TajoMasterProtocol.proto | 11 +
.../resources/webapps/admin/WEB-INF/web.xml | 10 +
.../src/main/resources/webapps/admin/index.jsp | 69 +-
.../src/main/resources/webapps/admin/query.jsp | 38 ++
.../resources/webapps/worker/WEB-INF/web.xml | 10 +
.../src/main/resources/webapps/worker/index.jsp | 60 ++
.../resources/webapps/worker/querydetail.jsp | 60 ++
.../src/main/resources/webapps/worker/style.css | 285 ++++++++
.../org/apache/tajo/BackendTestingUtil.java | 34 +-
.../org/apache/tajo/TajoTestingCluster.java | 3 +-
.../org/apache/tajo/client/TestTajoClient.java | 26 +-
.../plan/global/TestGlobalQueryPlanner.java | 8 +-
.../global/TestGlobalQueryOptimizer.java | 6 +-
.../planner/physical/TestBNLJoinExec.java | 8 +-
.../planner/physical/TestBSTIndexExec.java | 12 +-
.../planner/physical/TestExternalSortExec.java | 7 +-
.../planner/physical/TestHashAntiJoinExec.java | 8 +-
.../planner/physical/TestHashJoinExec.java | 8 +-
.../planner/physical/TestHashSemiJoinExec.java | 8 +-
.../planner/physical/TestMergeJoinExec.java | 8 +-
.../engine/planner/physical/TestNLJoinExec.java | 11 +-
.../planner/physical/TestPhysicalPlanner.java | 18 +-
.../engine/planner/physical/TestSortExec.java | 6 +-
.../tajo/engine/query/TestResultSetImpl.java | 21 +-
.../tajo/master/TestExecutionBlockCursor.java | 5 +-
.../org/apache/tajo/storage/TestRowFile.java | 39 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 18 +-
.../tajo/storage/AbstractStorageManager.java | 669 +++++++++++++++++++
.../java/org/apache/tajo/storage/Fragment.java | 47 +-
.../org/apache/tajo/storage/MergeScanner.java | 5 +-
.../org/apache/tajo/storage/StorageManager.java | 658 +-----------------
.../tajo/storage/StorageManagerFactory.java | 96 +++
.../apache/tajo/storage/v2/CSVFileScanner.java | 383 +++++++++++
.../apache/tajo/storage/v2/DiskDeviceInfo.java | 62 ++
.../tajo/storage/v2/DiskFileScanScheduler.java | 168 +++++
.../org/apache/tajo/storage/v2/DiskInfo.java | 75 +++
.../apache/tajo/storage/v2/DiskMountInfo.java | 86 +++
.../org/apache/tajo/storage/v2/DiskUtil.java | 198 ++++++
.../apache/tajo/storage/v2/FileScanRunner.java | 75 +++
.../apache/tajo/storage/v2/FileScannerV2.java | 253 +++++++
.../apache/tajo/storage/v2/RCFileScanner.java | 256 +++++++
.../apache/tajo/storage/v2/ScanScheduler.java | 148 ++++
.../tajo/storage/v2/StorageManagerV2.java | 135 ++++
.../src/main/resources/storage-default.xml | 49 ++
.../tajo/storage/TestCompressionStorages.java | 10 +-
.../apache/tajo/storage/TestMergeScanner.java | 18 +-
.../apache/tajo/storage/TestStorageManager.java | 14 +-
.../org/apache/tajo/storage/TestStorages.java | 14 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 47 +-
.../index/TestSingleCSVFileBSTIndex.java | 10 +-
.../src/test/resources/storage-default.xml | 30 +
82 files changed, 3758 insertions(+), 1008 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3d50036..136a5f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -312,6 +312,7 @@
<exclude>**/*.schema</exclude>
<exclude>**/*.tbl</exclude>
<exclude>**/*.jsp</exclude>
+ <exclude>**/web.xml</exclude>
<!-- generated content -->
<exclude>**/target/**</exclude>
<exclude>**/*.log</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 6164553..3d61ebb 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -88,6 +88,8 @@ message FragmentProto {
required TableProto meta = 5;
optional TableStatProto stat = 6;
optional bool distCached = 7 [default = false];
+ repeated string hosts = 8;
+ repeated int32 diskIds = 9;
}
message TableProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index 9c1d458..78b7e6e 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -67,6 +67,7 @@
<systemProperties>
<tajo.test>TRUE</tajo.test>
</systemProperties>
+ <argLine>-Xms512m -Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index c273f6f..ced5e5e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -351,49 +351,55 @@ public class TajoCli {
if (status.getState() == QueryState.QUERY_SUCCEEDED) {
if (status.hasResult()) {
ResultSet res = client.getQueryResult(queryId);
- if (res == null) {
- sout.println("OK");
- return;
- }
-
- ResultSetMetaData rsmd = res.getMetaData();
- TableDesc desc = client.getResultDesc(queryId);
- sout.println("final state: " + status.getState()
- + ", init time: " + (((float)(status.getInitTime() - status.getSubmitTime())
- / 1000.0) + " sec")
- + ", execution time: " + (((float)status.getFinishTime() - status.getInitTime())
- / 1000.0) + " sec"
- + ", total response time: " + (((float)(status.getFinishTime() -
- status.getSubmitTime()) / 1000.0) + " sec"));
- sout.println("result: " + desc.getPath() + "\n");
-
- int numOfColumns = rsmd.getColumnCount();
- for (int i = 1; i <= numOfColumns; i++) {
- if (i > 1) sout.print(", ");
- String columnName = rsmd.getColumnName(i);
- sout.print(columnName);
- }
- sout.println("\n-------------------------------");
+ try {
+ if (res == null) {
+ sout.println("OK");
+ return;
+ }
- int numOfPrintedRows = 0;
- while (res.next()) {
- // TODO - to be improved to print more formatted text
+ ResultSetMetaData rsmd = res.getMetaData();
+ TableDesc desc = client.getResultDesc(queryId);
+ sout.println("final state: " + status.getState()
+ + ", init time: " + (((float)(status.getInitTime() - status.getSubmitTime())
+ / 1000.0) + " sec")
+ + ", execution time: " + (((float)status.getFinishTime() - status.getInitTime())
+ / 1000.0) + " sec"
+ + ", total response time: " + (((float)(status.getFinishTime() -
+ status.getSubmitTime()) / 1000.0) + " sec"));
+ sout.println("result: " + desc.getPath() + "\n");
+
+ int numOfColumns = rsmd.getColumnCount();
for (int i = 1; i <= numOfColumns; i++) {
if (i > 1) sout.print(", ");
- String columnValue = res.getObject(i).toString();
- sout.print(columnValue);
+ String columnName = rsmd.getColumnName(i);
+ sout.print(columnName);
}
- sout.println();
- sout.flush();
- numOfPrintedRows++;
- if (numOfPrintedRows >= PRINT_LIMIT) {
- sout.print("continue... ('q' is quit)");
- sout.flush();
- if (sin.read() == 'q') {
- break;
+ sout.println("\n-------------------------------");
+
+ int numOfPrintedRows = 0;
+ while (res.next()) {
+ // TODO - to be improved to print more formatted text
+ for (int i = 1; i <= numOfColumns; i++) {
+ if (i > 1) sout.print(", ");
+ String columnValue = res.getObject(i).toString();
+ sout.print(columnValue);
}
- numOfPrintedRows = 0;
sout.println();
+ sout.flush();
+ numOfPrintedRows++;
+ if (numOfPrintedRows >= PRINT_LIMIT) {
+ sout.print("continue... ('q' is quit)");
+ sout.flush();
+ if (sin.read() == 'q') {
+ break;
+ }
+ numOfPrintedRows = 0;
+ sout.println();
+ }
+ }
+ } finally {
+ if(res != null) {
+ res.close();
}
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2d123bd..7d68e19 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -32,8 +32,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.Fragment;
-import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.IndexUtil;
@@ -42,9 +42,9 @@ import java.io.IOException;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
protected final TajoConf conf;
- protected final StorageManager sm;
+ protected final AbstractStorageManager sm;
- public PhysicalPlannerImpl(final TajoConf conf, final StorageManager sm) {
+ public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
this.conf = conf;
this.sm = sm;
}
@@ -227,7 +227,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new TunnelExec(ctx, plan.getOutSchema(), subOp);
}
- return new StoreTableExec(ctx, sm, plan, subOp);
+ return new StoreTableExec(ctx, plan, subOp);
}
public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index a0d69e0..fe7bd44 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -47,7 +47,7 @@ public class BSTIndexScanExec extends PhysicalExec {
private boolean initialize = true;
public BSTIndexScanExec(TaskAttemptContext context,
- StorageManager sm , ScanNode scanNode ,
+ AbstractStorageManager sm , ScanNode scanNode ,
Fragment fragment, Path fileName , Schema keySchema,
TupleComparator comparator , Datum[] datum) throws IOException {
super(context, scanNode.getInSchema(), scanNode.getOutSchema());
@@ -60,7 +60,7 @@ public class BSTIndexScanExec extends PhysicalExec {
}
this.datum = datum;
- this.fileScanner = (SeekableScanner)StorageManager.getScanner(context.getConf(),
+ this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
fragment.getMeta(), fragment, outSchema);
this.fileScanner.init();
this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 7ba0d8a..6d49880 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -45,7 +45,7 @@ public class ExternalSortExec extends SortExec {
private int SORT_BUFFER_SIZE;
public ExternalSortExec(final TaskAttemptContext context,
- final StorageManager sm, final SortNode plan, final PhysicalExec child)
+ final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child,
plan.getSortKeys());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index dc7f08c..f68ff59 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -44,7 +44,7 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
private FileAppender appender;
private TableMeta meta;
- public IndexedStoreExec(final TaskAttemptContext context, final StorageManager sm,
+ public IndexedStoreExec(final TaskAttemptContext context, final AbstractStorageManager sm,
final PhysicalExec child, final Schema inSchema, final Schema outSchema,
final SortSpec[] sortSpecs) throws IOException {
super(context, inSchema, outSchema, child);
@@ -71,7 +71,7 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
.newTableMeta(this.outSchema, CatalogProtos.StoreType.CSV);
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
- this.appender = (FileAppender) StorageManager.getAppender(context.getConf(), meta,
+ this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
new Path(storeTablePath, "output"));
this.appender.enableStats();
this.appender.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index f70bf18..a249edc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -33,10 +33,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.*;
import java.io.IOException;
import java.text.NumberFormat;
@@ -64,7 +61,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
private final Path storeTablePath;
private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
- public PartitionedStoreExec(TaskAttemptContext context, final StorageManager sm,
+ public PartitionedStoreExec(TaskAttemptContext context, final AbstractStorageManager sm,
final StoreTableNode plan, final PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
Preconditions.checkArgument(plan.hasPartitionKey());
@@ -101,7 +98,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
FileStatus status = fs.getFileStatus(dataFile);
LOG.info("File size: " + status.getLen());
}
- appender = StorageManager.getAppender(context.getConf(), meta, dataFile);
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, dataFile);
appender.enableStats();
appender.init();
appenderMap.put(partition, appender);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 4aa8a1c..bafeeea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -55,6 +55,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
@Override
public Tuple next() throws IOException {
Tuple tuple = child.next();
+
if (tuple == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 474415e..9051219 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -46,7 +46,7 @@ public class SeqScanExec extends PhysicalExec {
private Projector projector;
private EvalContext [] evalContexts;
- public SeqScanExec(TaskAttemptContext context, StorageManager sm,
+ public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
ScanNode plan, Fragment[] fragments) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -91,8 +91,8 @@ public class SeqScanExec extends PhysicalExec {
this.scanner = new MergeScanner(context.getConf(), fragments[0].getMeta(),
TUtil.newList(fragments));
} else {
- this.scanner = StorageManager.getScanner(context.getConf(), fragments[0].getMeta(),
- fragments[0], projected);
+ this.scanner = StorageManagerFactory.getStorageManager(
+ context.getConf()).getScanner(fragments[0].getMeta(), fragments[0], projected);
}
scanner.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 7d6a525..a799694 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -28,10 +28,7 @@ import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.*;
import java.io.IOException;
@@ -47,10 +44,8 @@ public class StoreTableExec extends UnaryPhysicalExec {
* @throws java.io.IOException
*
*/
- public StoreTableExec(TaskAttemptContext context, StorageManager sm,
- StoreTableNode plan, PhysicalExec child) throws IOException {
+ public StoreTableExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
-
this.plan = plan;
}
@@ -68,10 +63,10 @@ public class StoreTableExec extends UnaryPhysicalExec {
Path storeTablePath = new Path(context.getWorkDir(), "out");
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
- appender = StorageManager.getAppender(context.getConf(), meta,
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
StorageUtil.concatPath(storeTablePath, "0"));
} else {
- appender = StorageManager.getAppender(context.getConf(), meta, context.getOutputPath());
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, context.getOutputPath());
}
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 76f8571..2ddd891 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -49,7 +49,7 @@ import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageUtil;
import java.io.IOException;
@@ -65,7 +65,7 @@ public class GlobalEngine extends AbstractService {
private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
private final MasterContext context;
- private final StorageManager sm;
+ private final AbstractStorageManager sm;
private SQLAnalyzer analyzer;
private HiveConverter converter;
@@ -107,7 +107,7 @@ public class GlobalEngine extends AbstractService {
NoSuchQueryIdException, IllegalQueryStatusException,
UnknownWorkerException, EmptyClusterException {
- LOG.info("SQL: " + sql);
+ LOG.info(">>>>>SQL: " + sql);
try {
// setting environment variables
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index d58d4db..7c374d2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -32,7 +32,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
@@ -47,11 +47,11 @@ public class GlobalPlanner {
private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
private TajoConf conf;
- private StorageManager sm;
+ private AbstractStorageManager sm;
private QueryId queryId;
public GlobalPlanner(final TajoConf conf,
- final StorageManager sm,
+ final AbstractStorageManager sm,
final EventHandler eventHandler)
throws IOException {
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 01f312c..0635156 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -47,7 +47,8 @@ import org.apache.tajo.engine.function.builtin.*;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.master.rm.YarnTajoResourceManager;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.webapp.StaticHttpServer;
@@ -87,7 +88,7 @@ public class TajoMaster extends CompositeService {
private CatalogServer catalogServer;
private CatalogService catalog;
- private StorageManager storeManager;
+ private AbstractStorageManager storeManager;
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
@@ -121,7 +122,7 @@ public class TajoMaster extends CompositeService {
// check the system directory and create if they are not created.
checkAndInitializeSystemDirectories();
- this.storeManager = new StorageManager(systemConf);
+ this.storeManager = StorageManagerFactory.getStorageManager(systemConf);
catalogServer = new CatalogServer(initBuiltinFunctions());
addIfService(catalogServer);
@@ -140,7 +141,7 @@ public class TajoMaster extends CompositeService {
addIfService(tajoMasterService);
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error(e.getMessage(), e);
}
super.init(systemConf);
@@ -157,7 +158,8 @@ public class TajoMaster extends CompositeService {
}
private void initWebServer() throws Exception {
- webServer = StaticHttpServer.getInstance(this ,"admin", null, 8080 ,
+ int httpPort = systemConf.getInt("tajo.master.http.port", 8080);
+ webServer = StaticHttpServer.getInstance(this ,"admin", null, httpPort ,
true, null, context.getConf(), null);
webServer.start();
}
@@ -341,7 +343,7 @@ public class TajoMaster extends CompositeService {
} finally {
out.close();
}
- defaultFS.setReplication(systemConfPath, (short)systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
+ defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
}
@Override
@@ -368,7 +370,7 @@ public class TajoMaster extends CompositeService {
return this.catalog;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return this.storeManager;
}
@@ -407,7 +409,7 @@ public class TajoMaster extends CompositeService {
return globalEngine;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return storeManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 71b0114..574122b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -173,12 +173,12 @@ public class TaskSchedulerImpl extends AbstractService
if (taskRequests.size() > 0) {
if (scheduledRequests.leafTaskNum() > 0) {
- LOG.info("Try to schedule tasks with taskRequestEvents: " +
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
taskRequests.size() + ", LeafTask Schedule Request: " +
scheduledRequests.leafTaskNum());
taskRequests.getTaskRequests(taskRequestEvents,
scheduledRequests.leafTaskNum());
- LOG.info("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+ LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
if (taskRequestEvents.size() > 0) {
scheduledRequests.assignToLeafTasks(taskRequestEvents);
taskRequestEvents.clear();
@@ -188,7 +188,7 @@ public class TaskSchedulerImpl extends AbstractService
if (taskRequests.size() > 0) {
if (scheduledRequests.nonLeafTaskNum() > 0) {
- LOG.info("Try to schedule tasks with taskRequestEvents: " +
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
taskRequests.size() + ", NonLeafTask Schedule Request: " +
scheduledRequests.nonLeafTaskNum());
taskRequests.getTaskRequests(taskRequestEvents,
@@ -390,17 +390,18 @@ public class TaskSchedulerImpl extends AbstractService
public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
Iterator<TaskRequestEvent> it = taskRequests.iterator();
TaskRequestEvent taskRequest;
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
- LOG.info("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
"containerId=" + taskRequest.getContainerId());
ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
-
- if(container == null) continue;
-
+ if(container == null) {
+ continue;
+ }
String host = container.getTaskHostName();
QueryUnitAttemptId attemptId = null;
@@ -479,8 +480,8 @@ public class TaskSchedulerImpl extends AbstractService
}
}
- LOG.info("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
- LOG.info("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
+ LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
+ LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
}
public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
@@ -489,7 +490,7 @@ public class TaskSchedulerImpl extends AbstractService
TaskRequestEvent taskRequest;
while (it.hasNext()) {
taskRequest = it.next();
- LOG.info("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+ LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
QueryUnitAttemptId attemptId;
// random allocation
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index c473586..4ba95b0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -39,14 +39,11 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.master.ExecutionBlockCursor;
import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,7 +58,7 @@ public class Query implements EventHandler<QueryEvent> {
private Map<ExecutionBlockId, SubQuery> subqueries;
private final EventHandler eventHandler;
private final MasterPlan plan;
- private final StorageManager sm;
+ private final AbstractStorageManager sm;
QueryMasterTask.QueryMasterTaskContext context;
private ExecutionBlockCursor cursor;
@@ -242,6 +239,10 @@ public class Query implements EventHandler<QueryEvent> {
return this.subqueries.get(id);
}
+ public Collection<SubQuery> getSubQueries() {
+ return Collections.unmodifiableCollection(this.subqueries.values());
+ }
+
public QueryState getState() {
readLock.lock();
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 809dce2..c54f8da 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -235,6 +235,8 @@ public class QueryInProgress extends CompositeService {
this.queryInfo.setQueryMasterResource(queryInfo.getQueryMasterResource());
}
this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
this.queryInfo.setLastMessage(queryInfo.getLastMessage());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 424d5bf..669dee3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -32,6 +32,8 @@ import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -85,6 +87,14 @@ public class QueryJobManager extends CompositeService {
return dispatcher.getEventHandler();
}
+ public Collection<QueryInProgress> getRunningQueries() {
+ return Collections.unmodifiableCollection(runningQueries.values());
+ }
+
+ public Collection<QueryInProgress> getFinishedQueries() {
+ return Collections.unmodifiableCollection(finishedQueries.values());
+ }
+
public QueryInfo createNewQueryJob(QueryContext queryContext, String sql, LogicalRootNode plan) throws Exception {
QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryContext, queryId, sql, plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index d45988c..6611102 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -36,13 +36,11 @@ import org.apache.tajo.master.GlobalPlanner;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.worker.TajoWorker;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
// TODO - when exception, send error status to QueryJobManager
@@ -59,7 +57,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
private GlobalOptimizer globalOptimizer;
- private StorageManager storageManager;
+ private AbstractStorageManager storageManager;
private TajoConf systemConf;
@@ -93,7 +91,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
addIfService(dispatcher);
- this.storageManager = new StorageManager(systemConf);
+ this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
globalPlanner = new GlobalPlanner(systemConf, storageManager, dispatcher.getEventHandler());
globalOptimizer = new GlobalOptimizer();
@@ -185,6 +183,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
return this.queryMasterContext;
}
+ public Collection<QueryMasterTask> getQueryMasterTasks() {
+ return queryMasterTasks.values();
+ }
+
public class QueryMasterContext {
private TajoConf conf;
@@ -204,7 +206,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
return clock;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return storageManager;
}
@@ -272,7 +274,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
public void run() {
LOG.info("Start QueryMaster heartbeat thread");
while(!queryMasterStop.get()) {
- //TODO report all query status
List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
synchronized(queryMasterTasks) {
tempTasks.addAll(queryMasterTasks.values());
@@ -285,6 +286,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
.setState(eachTask.getState())
.setQueryId(eachTask.getQueryId().getProto())
+ .setQueryProgress(eachTask.getQuery().getProgress())
+ .setQueryFinishTime(eachTask.getQuery().getFinishTime())
.build();
workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
@@ -318,15 +321,17 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
for(QueryMasterTask eachTask: tempTasks) {
- try {
- long lastHeartbeat = eachTask.getLastClientHeartbeat();
- long time = System.currentTimeMillis() - lastHeartbeat;
- if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
- LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
- eachTask.expiredSessionTimeout();
+ if(!eachTask.isStopped()) {
+ try {
+ long lastHeartbeat = eachTask.getLastClientHeartbeat();
+ long time = System.currentTimeMillis() - lastHeartbeat;
+ if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+ LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
+ eachTask.expiredSessionTimeout();
+ }
+ } catch (Exception e) {
+ LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
}
- } catch (Exception e) {
- LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 2212f82..a4fabcf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -44,7 +44,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import org.apache.tajo.worker.YarnResourceAllocator;
@@ -404,7 +404,7 @@ public class QueryMasterTask extends CompositeService {
return queryId;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return queryMasterContext.getStorageManager();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index ec54244..00dcc0b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -75,6 +75,9 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private int failedAttempts;
private int finishedAttempts; // finish are total of success, failed and killed
+ private long launchTime;
+ private long finishTime;
+
private static final StateMachineFactory
<QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
new StateMachineFactory
@@ -84,7 +87,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
+ TaskEventType.T_ATTEMPT_LAUNCHED, new AttemptLaunchedTransition())
.addTransition(TaskState.RUNNING, TaskState.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED)
@@ -186,7 +189,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
this.fragMap.put(fragment.getName(), fragment);
setDataLocations(fragment);
}
-
+
+ public String getSucceededHost() {
+ return succeededHost;
+ }
+
public void addFetch(String tableId, String uri) throws URISyntaxException {
this.addFetch(tableId, new URI(uri));
}
@@ -317,7 +324,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.attempts.get(this.lastAttemptId);
}
- protected QueryUnitAttempt getSuccessfulAttempt() {
+ public QueryUnitAttempt getSuccessfulAttempt() {
readLock.lock();
try {
if (null == successfulAttempt) {
@@ -342,6 +349,22 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
}
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public long getRunningTime() {
+ if(finishTime > 0) {
+ return finishTime - launchTime;
+ } else {
+ return System.currentTimeMillis() - launchTime;
+ }
+ }
+
// This is always called in the Write Lock
private void addAndScheduleAttempt() {
// Create new task attempt
@@ -385,14 +408,24 @@ public class QueryUnit implements EventHandler<TaskEvent> {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
QueryUnitAttempt attempt = task.attempts.get(
attemptEvent.getTaskAttemptId());
+
task.successfulAttempt = attemptEvent.getTaskAttemptId();
task.succeededHost = attempt.getHost();
+ task.finishTime = System.currentTimeMillis();
task.succeededPullServerPort = attempt.getPullServerPort();
task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
SubQueryEventType.SQ_TASK_COMPLETED));
}
}
+ private static class AttemptLaunchedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+ @Override
+ public void transition(QueryUnit task,
+ TaskEvent event) {
+ task.launchTime = System.currentTimeMillis();
+ }
+ }
+
private static class AttemptFailedTransition implements
MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index ac92386..1bf45ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -48,8 +48,8 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.master.*;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.Fragment;
-import org.apache.tajo.storage.StorageManager;
import java.io.IOException;
import java.util.*;
@@ -73,7 +73,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int priority;
private TableMeta meta;
private EventHandler eventHandler;
- private final StorageManager sm;
+ private final AbstractStorageManager sm;
private TaskSchedulerImpl taskScheduler;
private QueryMasterTask.QueryMasterTaskContext context;
@@ -135,7 +135,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int completedTaskCount = 0;
- public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, StorageManager sm) {
+ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, AbstractStorageManager sm) {
this.context = context;
this.block = block;
this.sm = sm;
@@ -230,7 +230,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return this.priority;
}
- public StorageManager getStorageManager() {
+ public AbstractStorageManager getStorageManager() {
return sm;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 7c4bf59..d1a0c96 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -193,23 +193,25 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
class WorkerResourceAllocationThread extends Thread {
@Override
public void run() {
- LOG.info("====>WorkerResourceAllocationThread start");
+ LOG.info("WorkerResourceAllocationThread start");
while(!stopped.get()) {
try {
WorkerResourceRequest resourceRequest = requestQueue.take();
- LOG.info("====> allocateWorkerResources:" +
- (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
- ", required:" + resourceRequest.request.getNumWorks() +
- ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
- ", liveWorkers=" + liveWorkerResources.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocateWorkerResources:" +
+ (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
+ ", required:" + resourceRequest.request.getNumWorks() +
+ ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+ ", liveWorkers=" + liveWorkerResources.size());
+ }
List<WorkerResource> workerResources = chooseWorkers(false,
resourceRequest.request.getMemoryMBSlots(),
resourceRequest.request.getDiskSlots(),
resourceRequest.request.getNumWorks());
- LOG.debug("====> allocateWorkerResources: allocated:" + workerResources.size());
+ LOG.debug("allocateWorkerResources: allocated:" + workerResources.size());
if(workerResources.size() > 0) {
if(resourceRequest.queryMasterRequest) {
@@ -239,10 +241,11 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
LOG.debug("=========================================");
}
- requestQueue.add(resourceRequest);
+ requestQueue.put(resourceRequest);
Thread.sleep(100);
}
} catch(InterruptedException ie) {
+ LOG.error(ie);
}
}
}
@@ -334,13 +337,14 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
queryMasterWorkerResource = queryMasterMap.remove(queryId);
}
}
+
WorkerResource workerResource = new WorkerResource();
workerResource.copyId(queryMasterWorkerResource);
workerResource.setMemoryMBSlots(queryMasterMemoryMB);
workerResource.setDiskSlots(queryMasterDiskSlot);
workerResource.setCpuCoreSlots(0);
releaseWorkerResource(queryId, workerResource);
- LOG.info("released QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
+ LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
}
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
@@ -354,6 +358,10 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
WorkerResource workerResource = allWorkerResourceMap.get(hostAndPort);
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
+ workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+ workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+ workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+ workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
} else {
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(request.getTajoWorkerHost());
@@ -363,6 +371,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setManagerPort(request.getTajoWorkerPort());
workerResource.setClientPort(request.getTajoWorkerClientPort());
workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
+ workerResource.setHttpPort(request.getTajoWorkerHttpPort());
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
@@ -370,6 +379,10 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setMemoryMBSlots(request.getServerStatus().getSystem().getTotalMemoryMB());
workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
+ workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+ workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+ workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+ workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
} else {
workerResource.setMemoryMBSlots(4096);
workerResource.setDiskSlots(4);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index b958761..a1a4c3e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -31,6 +31,7 @@ public class WorkerResource {
private int managerPort;
private int clientPort;
private int pullServerPort;
+ private int httpPort;
private int diskSlots;
private int cpuCoreSlots;
@@ -40,6 +41,12 @@ public class WorkerResource {
private int usedMemoryMBSlots;
private int usedCpuCoreSlots;
+ private long maxHeap;
+ private long freeHeap;
+ private long totalHeap;
+
+ private int numRunningTasks;
+
private boolean queryMasterAllocated;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -251,4 +258,44 @@ public class WorkerResource {
public void setPullServerPort(int pullServerPort) {
this.pullServerPort = pullServerPort;
}
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public void setHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ }
+
+ public long getMaxHeap() {
+ return maxHeap;
+ }
+
+ public void setMaxHeap(long maxHeap) {
+ this.maxHeap = maxHeap;
+ }
+
+ public long getFreeHeap() {
+ return freeHeap;
+ }
+
+ public void setFreeHeap(long freeHeap) {
+ this.freeHeap = freeHeap;
+ }
+
+ public long getTotalHeap() {
+ return totalHeap;
+ }
+
+ public void setTotalHeap(long totalHeap) {
+ this.totalHeap = totalHeap;
+ }
+
+ public int getNumRunningTasks() {
+ return numRunningTasks;
+ }
+
+ public void setNumRunningTasks(int numRunningTasks) {
+ this.numRunningTasks = numRunningTasks;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
index 91fdb29..3164bd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
@@ -88,13 +88,15 @@ public class HttpServer {
final String appDir = getWebAppsPath(name);
ContextHandlerCollection contexts = new ContextHandlerCollection();
- webServer.setHandler(contexts);
webAppContext = new WebAppContext();
webAppContext.setDisplayName(name);
webAppContext.setContextPath("/");
- webAppContext.setWar(appDir + "/" + name);
- webServer.addHandler(webAppContext);
+ webAppContext.setResourceBase(appDir + "/" + name);
+ webAppContext.setDescriptor(appDir + "/" + name + "/WEB-INF/web.xml");
+
+ contexts.addHandler(webAppContext);
+ webServer.setHandler(contexts);
addDefaultApps(contexts, appDir, conf);
}
@@ -236,11 +238,10 @@ public class HttpServer {
}
}
- protected String getWebAppsPath(String appName) throws FileNotFoundException {
- URL url = getClass().getClassLoader().getResource("webapps/" + appName);
+ protected String getWebAppsPath(String name) throws FileNotFoundException {
+ URL url = getClass().getClassLoader().getResource("webapps/" + name);
if (url == null) {
- throw new FileNotFoundException("webapps/" + appName
- + " not found in CLASSPATH");
+ throw new FileNotFoundException("webapps/" + name + " not found in CLASSPATH");
}
String urlString = url.toString();
return urlString.substring(0, urlString.lastIndexOf('/'));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index 13cd98a..5cf2e7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -27,17 +27,18 @@ import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
import java.io.IOException;
public class TajoQueryEngine {
- private final static Log LOG = LogFactory.getLog(TajoQueryEngine.class);
- private final StorageManager storageManager;
+
+ private final AbstractStorageManager storageManager;
private final PhysicalPlanner phyPlanner;
public TajoQueryEngine(TajoConf conf) throws IOException {
- this.storageManager = new StorageManager(conf);
+ this.storageManager = StorageManagerFactory.getStorageManager(conf);
this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index da8c062..05b5416 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -93,6 +93,8 @@ public class TajoWorker extends CompositeService {
private AtomicInteger numClusterSlots = new AtomicInteger();
+ private int httpPort;
+
public TajoWorker(String daemonMode) throws Exception {
super(TajoWorker.class.getName());
this.daemonMode = daemonMode;
@@ -114,7 +116,6 @@ public class TajoWorker extends CompositeService {
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
randomPort = false;
}
- int infoPort = tajoConf.getInt("tajo.worker.info.port", 8090);
int clientPort = tajoConf.getInt("tajo.worker.client.rpc.port", 8091);
int managerPort = tajoConf.getInt("tajo.worker.manager.rpc.port", 8092);
@@ -124,14 +125,6 @@ public class TajoWorker extends CompositeService {
tajoConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
//infoPort = 0;
}
- try {
- //TODO WebServer port configurable
- webServer = StaticHttpServer.getInstance(this, "admin", null, infoPort,
- true, null, tajoConf, null);
- webServer.start();
- } catch (Exception e) {
- LOG.error("Can' start info http server:" + e.getMessage(), e);
- }
if(!"qm".equals(daemonMode)) {
taskRunnerManager = new TaskRunnerManager(workerContext);
@@ -149,7 +142,22 @@ public class TajoWorker extends CompositeService {
tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, managerPort);
addService(tajoWorkerManagerService);
- LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort="
+ + managerPort);
+
+ try {
+ httpPort = tajoConf.getInt("tajo.worker.http.port", 28080);
+ webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
+ true, null, tajoConf, null);
+ webServer.start();
+ httpPort = webServer.getPort();
+ LOG.info("Worker info server started:" + httpPort);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort="
+ + managerPort);
+
} else {
LOG.info("Tajo worker started: mode=" + daemonMode);
}
@@ -157,6 +165,10 @@ public class TajoWorker extends CompositeService {
super.init(conf);
}
+ public WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
@Override
public void start() {
super.start();
@@ -187,6 +199,12 @@ public class TajoWorker extends CompositeService {
tajoMasterRpc.close();
}
+ if(webServer != null) {
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ }
+ }
super.stop();
LOG.info("TajoWorker main thread exiting");
}
@@ -220,6 +238,9 @@ public class TajoWorker extends CompositeService {
return pullService;
}
+ public String getWorkerName() {
+ return getTajoWorkerManagerService().getHostAndPort();
+ }
public void stopWorker(boolean force) {
stop();
if(force) {
@@ -371,18 +392,26 @@ public class TajoWorker extends CompositeService {
.build());
}
}
+ TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
+ TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ .setMaxHeap(Runtime.getRuntime().maxMemory())
+ .setFreeHeap(Runtime.getRuntime().freeMemory())
+ .setTotalHeap(Runtime.getRuntime().totalMemory())
+ .build();
+
TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
- .setRunningTaskNum(0) //TODO
+ .setRunningTaskNum(taskRunnerManager == null ? 1 : taskRunnerManager.getNumTasks()) //TODO
.setSystem(systemInfo)
.setDiskSlots(workerDiskSlots)
+ .setJvmHeap(jvmHeap)
.build();
-
TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
.setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
.setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setTajoWorkerHttpPort(httpPort)
.setTajoWorkerPullServerPort(pullServerPort)
.setServerStatus(serverStatus)
.build();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 28cc5f6..0286afa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -128,10 +128,10 @@ public class TajoWorkerManagerService extends CompositeService
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
if(queryMasterTask == null || queryMasterTask.isStopped()) {
- LOG.info("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
done.run(TaskSchedulerImpl.stopTaskRunnerReq);
} else {
- LOG.info("getTask:" + cid + ", ebId:" + ebId);
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId);
queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index e66751c..915234f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -360,7 +360,6 @@ public class Task {
}
public void run() {
-
String errorMessage = null;
try {
context.setState(TaskAttemptState.TA_RUNNING);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index f1ca567..6d06083 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -82,6 +82,12 @@ public class TaskRunnerManager extends CompositeService {
}
}
+ public int getNumTasks() {
+ synchronized(taskRunnerMap) {
+ return taskRunnerMap.size();
+ }
+ }
+
public void startTask(final String[] params) {
//TODO change to use event dispatcher
Thread t = new Thread() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index 04abfa3..cdf78a6 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -42,10 +42,18 @@ message ServerStatusProto {
required int64 freeSpace = 3;
required int64 usableSpace = 4;
}
+
+ message JvmHeap {
+ required int64 maxHeap = 1;
+ required int64 totalHeap = 2;
+ required int64 freeHeap = 3;
+ }
+
required System system = 1;
required int32 diskSlots = 2;
repeated Disk disk = 3;
required int32 runningTaskNum = 4;
+ required JvmHeap jvmHeap = 5;
}
message TajoHeartbeat {
@@ -57,6 +65,9 @@ message TajoHeartbeat {
optional QueryState state = 6;
optional string statusMessage = 7;
optional int32 tajoWorkerPullServerPort = 8;
+ optional int32 tajoWorkerHttpPort = 9;
+ optional float queryProgress = 10;
+ optional int64 queryFinishTime = 11;
}
message TajoHeartbeatResponse {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml
new file mode 100644
index 0000000..5d00e3b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/WEB-INF/web.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ version="2.5">
+ <display-name>Tajo Master</display-name>
+ <welcome-file-list>
+ <welcome-file>index.jsp</welcome-file>
+ </welcome-file-list>
+</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index 7cedfd4..becf417 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -8,6 +8,8 @@
<%@ page import="org.apache.tajo.master.*" %>
<%@ page import="org.apache.tajo.master.rm.*" %>
<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="java.text.SimpleDateFormat" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
@@ -19,24 +21,32 @@
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
CatalogService catalog = master.getCatalog();
Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+ List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Collections.sort(wokerKeys);
%>
</head>
<body>
<img src='img/tajo_logo.png'/>
<hr/>
+
+<a href="index.jsp">Main</a>
+<a href="query.jsp">Query</a>
+<h3>Works</h3>
+<div>Live:<%=wokerKeys.size()%></div>
<table>
- <tr><th>Worker</th><th>Ports</th><th>Slot</th></th><th>Memory(Used/Capacity)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+ <tr><th>Worker</th><th>Ports</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
<%
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
- Collections.sort(wokerKeys);
for(String eachWorker: wokerKeys) {
WorkerResource worker = workers.get(eachWorker);
+ String workerHttp = "http://" + worker.getAllocatedHost() + ":" + worker.getHttpPort();
%>
+
<tr>
- <td><%=eachWorker%></td>
+ <td><a href='<%=workerHttp%>'><%=eachWorker%></a></td>
<td><%=worker.portsToStr()%></td>
+ <td><%=worker.getNumRunningTasks()%></td>
<td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
- <td><%=worker.getUsedMemoryMBSlots()%>/<%=worker.getMemoryMBSlots()%> MB</td>
+ <td><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
<td><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
<td><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
<td><%=worker.getWorkerStatus()%></td>
@@ -53,5 +63,54 @@
}
%>
</table>
+
+<%
+ Collection<QueryInProgress> runningQueries = master.getContext().getQueryJobManager().getRunningQueries();
+ Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
+%>
+<hr/>
+<h3>Running Queries</h3>
+<table>
+ <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>sql</th></tr>
+<%
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ for(QueryInProgress eachQuery: runningQueries) {
+ long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
+%>
+ <tr>
+ <td><%=eachQuery.getQueryId()%></td>
+ <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td>
+ <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
+ <td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td>
+ <td><%=(int)(time/1000)%> sec</td>
+ <td><%=eachQuery.getQueryInfo().getSql()%></td>
+ </tr>
+<%
+ }
+%>
+</table>
+
+<hr/>
+<h3>Finished Queries</h3>
+<table>
+ <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Finished</th><th>Time</th><th>Status</th><th>sql</th></tr>
+ <%
+ for(QueryInProgress eachQuery: finishedQueries) {
+ long runTime = eachQuery.getQueryInfo().getFinishTime() == 0 ? -1 :
+ eachQuery.getQueryInfo().getFinishTime() - eachQuery.getQueryInfo().getStartTime();
+ %>
+ <tr>
+ <td><%=eachQuery.getQueryId()%></td>
+ <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td>
+ <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
+ <td><%=df.format(eachQuery.getQueryInfo().getFinishTime())%></td>
+ <td><%=runTime%> ms</td>
+ <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
+ <td><%=eachQuery.getQueryInfo().getSql()%></td>
+ </tr>
+ <%
+ }
+ %>
+</table>
</body>
</html>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
new file mode 100644
index 0000000..10d8e13
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -0,0 +1,38 @@
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="java.net.InetSocketAddress" %>
+<%@ page import="java.net.InetAddress" %>
+<%@ page import="org.apache.hadoop.conf.Configuration" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo main</title>
+ <%
+ TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ CatalogService catalog = master.getCatalog();
+ Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+ %>
+</head>
+<body>
+<img src='img/tajo_logo.png'/>
+<hr/>
+
+<a href="index.jsp">Main</a>
+<a href="query.jsp">Query</a>
+
+<div><h3>Query</h3></div>
+<div>
+ <textarea></textarea>
+</div>
+<div>Run Query</div>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml
new file mode 100644
index 0000000..8badd7a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/WEB-INF/web.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ version="2.5">
+ <display-name>Tajo Worker</display-name>
+ <welcome-file-list>
+ <welcome-file>index.jsp</welcome-file>
+ </welcome-file-list>
+</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
new file mode 100644
index 0000000..4488598
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/index.jsp
@@ -0,0 +1,60 @@
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="java.net.InetSocketAddress" %>
+<%@ page import="java.net.InetAddress" %>
+<%@ page import="org.apache.hadoop.conf.Configuration" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo main</title>
+<%
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ Collection<QueryMasterTask> queryMasterTasks = tajoWorker.getWorkerContext()
+ .getTajoWorkerManagerService().getQueryMaster().getQueryMasterTasks();
+%>
+</head>
+<body>
+<img src='img/tajo_logo.png'/>
+
+<h3><%=tajoWorker.getWorkerContext().getWorkerName()%></h3>
+<hr/>
+
+<table border=0>
+ <tr><td width='100'>MaxHeap: </td><td><%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td>
+ <tr><td width='100'>TotalHeap: </td><td><%=Runtime.getRuntime().totalMemory()/1024/1024%> MB</td>
+ <tr><td width='100'>FreeHeap: </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB</td>
+</table>
+
+<h3>QueryMaster</h3>
+<table>
+
+<%
+ for(QueryMasterTask eachQueryMasterTask: queryMasterTasks) {
+ Query query = eachQueryMasterTask.getQuery();
+%>
+ <tr>
+ <td><a href='querydetail.jsp?queryId=<%=query.getId()%>'><%=query.getId()%></a></td>
+ <td><%=query.getFinishTime()%></td>
+ <td><%=query.getStartTime()%></td>
+ <td><%=query.getProgress()%></td>
+ </tr>
+<%
+ }
+%>
+</table>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
new file mode 100644
index 0000000..eaaafe5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -0,0 +1,60 @@
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
+
+<%@ page import="java.util.*" %>
+<%@ page import="java.net.InetSocketAddress" %>
+<%@ page import="java.net.InetAddress" %>
+<%@ page import="org.apache.hadoop.conf.Configuration" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.master.querymaster.*" %>
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+
+<html>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>Query Detail Info</title>
+<%
+ QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId"));
+ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
+ .getTajoWorkerManagerService().getQueryMaster().getQueryMasterTask(queryId);
+
+ Query query = queryMasterTask.getQuery();
+ Collection<SubQuery> subQueries = query.getSubQueries();
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ for(SubQuery eachSubQuery: subQueries) {
+%>
+ <div><%=eachSubQuery.getId()%>(<%=eachSubQuery.getState()%>)</div>
+ <div>Started:<%=df.format(eachSubQuery.getStartTime())%>, <%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></div>
+ <table>
+ <tr><th>Id</th><th>Status</th><th>Start Time</th><th>Running Time</th><th>Host</th></tr>
+<%
+ QueryUnit[] queryUnits = eachSubQuery.getQueryUnits();
+ for(QueryUnit eachQueryUnit: queryUnits) {
+ //QueryUnitAttempt queryUnitAttempt = eachQueryUnit.getSuccessfulAttempt();
+%>
+ <tr>
+ <td><%=eachQueryUnit.getId()%></td>
+ <td><%=eachQueryUnit.getState()%></td>
+ <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td>
+ <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime()%> ms</td>
+ <td><%=eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost()%></td>
+ </tr>
+<%
+ }
+%>
+ </table>
+<%
+ }
+%>
+</head>
+<body>
\ No newline at end of file