You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/11 10:11:35 UTC
[06/17] git commit: TAJO-989: Cleanup of child blocks after parent
execution block is complete. (jinho)
TAJO-989: Cleanup of child blocks after parent execution block is complete. (jinho)
Closes #103
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0f3412a7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0f3412a7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0f3412a7
Branch: refs/heads/index_support
Commit: 0f3412a74bb3c565df1259b19630bc17e1bc69e0
Parents: ae38468
Author: jinossy <ji...@gmail.com>
Authored: Tue Aug 5 11:58:16 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Tue Aug 5 11:58:16 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../tajo/master/querymaster/QueryMaster.java | 30 +++++++++-
.../tajo/master/querymaster/SubQuery.java | 20 ++++++-
.../tajo/worker/TajoWorkerManagerService.java | 13 +++++
.../main/java/org/apache/tajo/worker/Task.java | 6 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 21 ++++++-
.../src/main/proto/TajoWorkerProtocol.proto | 5 ++
.../apache/tajo/worker/TestDeletionService.java | 61 ++++++++++++++++++++
8 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 61593a2..be71bf4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-989: Cleanup of child blocks after parent execution block is complete
+ (jinho)
+
TAJO-966: Range partition should support split of multiple characters.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index f173c24..25af82f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -18,6 +18,7 @@
package org.apache.tajo.master.querymaster;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -162,6 +164,30 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
}
+ protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) {
+ LOG.info("cleanup executionBlocks : " + executionBlockIds);
+ NettyClientBase rpc = null;
+ List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+ TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
+ builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
+ TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
+ for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ try {
+ if (worker.getPeerRpcPort() == 0) continue;
+
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoWorkerProtocol.class, true);
+ TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
+
+ tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get());
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ } finally {
+ connPool.releaseConnection(rpc);
+ }
+ }
+ }
+
private void cleanup(QueryId queryId) {
LOG.info("cleanup query resources : " + queryId);
NettyClientBase rpc = null;
@@ -338,7 +364,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
queryMasterTask.stop();
//if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
// && !workerContext.isYarnContainerMode()) {
- cleanup(queryId); // TODO We will support yarn mode
+ if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+ cleanup(queryId);
+ }
//}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index f2e9dd5..17efa21 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
@@ -1004,7 +1005,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
subQuery.eventHandler.handle(
new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
- subQuery.getId(), allocationEvent.getAllocatedContainer()));
+ subQuery.getId(), allocationEvent.getAllocatedContainer())
+ );
subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
} catch (Throwable t) {
@@ -1107,6 +1109,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private void cleanup() {
stopScheduler();
releaseContainers();
+
+ if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+ List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
+ List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
+ for (ExecutionBlock executionBlock : childs){
+ ebIds.add(executionBlock.getId().getProto());
+ }
+
+ try {
+ getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
+ } catch (Throwable e) {
+ LOG.error(e);
+ }
+ }
}
private static class SubQueryCompleteTransition
@@ -1114,7 +1130,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
@Override
public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- // TODO - Commit subQuery & do cleanup
+ // TODO - Commit subQuery
// TODO - records succeeded, failed, killed completed task
// TODO - records metrics
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 13ef15d..e77da70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoIdProtos;
@@ -151,4 +152,16 @@ public class TajoWorkerManagerService extends CompositeService
workerContext.cleanup(new QueryId(request).toString());
done.run(TajoWorker.TRUE_PROTO);
}
+
+ @Override
+ public void cleanupExecutionBlocks(RpcController controller, TajoWorkerProtocol.ExecutionBlockListProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : request.getExecutionBlockIdList()) {
+ String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+ workerContext.cleanup(inputDir);
+ String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+ workerContext.cleanup(outputDir);
+ }
+ done.run(TajoWorker.TRUE_PROTO);
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 230c63a..3a4536a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -794,12 +794,10 @@ public class Task {
}
}
}
+
public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
Path workDir =
- StorageUtil.concatPath(
- quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
- "in",
- quid.getQueryUnitId().getExecutionBlockId().toString(),
+ StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
String.valueOf(quid.getQueryUnitId().getId()),
String.valueOf(quid.getId()));
return workDir;
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 3fcee06..9676192 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -45,6 +45,7 @@ import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.TajoIdUtils;
import java.net.InetSocketAddress;
@@ -172,6 +173,24 @@ public class TaskRunner extends AbstractService {
return executionBlockId + "," + containerId;
}
+ public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){
+ Path workDir =
+ StorageUtil.concatPath(
+ executionBlockId.getQueryId().toString(),
+ "output",
+ String.valueOf(executionBlockId.getId()));
+ return workDir;
+ }
+
+ public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
+ Path workDir =
+ StorageUtil.concatPath(
+ executionBlockId.getQueryId().toString(),
+ "in",
+ executionBlockId.toString());
+ return workDir;
+ }
+
@Override
public void init(Configuration conf) {
this.systemConf = (TajoConf)conf;
@@ -182,7 +201,7 @@ public class TaskRunner extends AbstractService {
localFS = FileSystem.getLocal(conf);
// the base dir for an output dir
- baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
+ baseDir = getBaseOutputDir(executionBlockId).toString();
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index ce8ce86..dc2b1d7 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -179,6 +179,10 @@ message RunExecutionBlockRequestProto {
optional string queryOutputPath = 6;
}
+message ExecutionBlockListProto {
+ repeated ExecutionBlockIdProto executionBlockId = 1;
+}
+
service TajoWorkerProtocolService {
rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
@@ -186,6 +190,7 @@ service TajoWorkerProtocolService {
rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
rpc cleanup(QueryIdProto) returns (BoolProto);
+ rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto);
}
message EnforceProperty {
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java
new file mode 100644
index 0000000..98251c1
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestDeletionService {
+ DeletionService deletionService;
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() {
+ if(deletionService != null){
+ deletionService.stop();
+ }
+ }
+
+ @Test
+ public final void testTemporalDirectory() throws IOException, InterruptedException {
+ int delay = 1;
+ deletionService = new DeletionService(1, delay);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path tempPath = CommonTestingUtil.getTestDir();
+ assertTrue(fs.exists(tempPath));
+ deletionService.delete(tempPath);
+ assertTrue(fs.exists(tempPath));
+
+ Thread.sleep(delay * 2 * 1000);
+ assertFalse(fs.exists(tempPath));
+ }
+}