You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/12/06 03:14:49 UTC
git commit: TAJO-268: Temporal files should be removed after query is
finished. (jinho) TAJO-292: Too many intermediate partition files. (jinho)
Updated Branches:
refs/heads/master 39fe4d765 -> c18a3f862
TAJO-268: Temporal files should be removed after query is finished. (jinho)
TAJO-292: Too many intermediate partition files. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c18a3f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c18a3f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c18a3f86
Branch: refs/heads/master
Commit: c18a3f862cf764205abc8d8e491dbc6719a7c730
Parents: 39fe4d7
Author: jinossy <ji...@gmail.com>
Authored: Fri Dec 6 11:13:39 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Dec 6 11:13:39 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../apache/tajo/master/TajoMasterService.java | 28 +++++
.../tajo/master/querymaster/QueryMaster.java | 59 ++++++++++
.../tajo/master/querymaster/SubQuery.java | 48 ++++++--
.../org/apache/tajo/worker/DeletionService.java | 115 +++++++++++++++++++
.../java/org/apache/tajo/worker/TajoWorker.java | 56 ++++++++-
.../tajo/worker/TajoWorkerManagerService.java | 8 ++
.../main/java/org/apache/tajo/worker/Task.java | 6 +-
.../src/main/proto/TajoMasterProtocol.proto | 8 +-
.../src/main/proto/TajoWorkerProtocol.proto | 2 +
11 files changed, 320 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ca5b42..6293ec4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -101,6 +101,10 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-268: Temporal files should be removed after query is finished. (jinho)
+
+ TAJO-292: Too many intermediate partition files. (jinho)
+
TAJO-375: TajoClient can't get result data when different os user
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1cbb3fc..0d9bbb0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -94,6 +94,7 @@ public class TajoConf extends YarnConfiguration {
// Tajo Worker Temporal Directories
WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir"),
+ WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false),
// Tajo Worker Resources
WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index c213dd5..aac4114 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -30,12 +30,14 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
public class TajoMasterService extends AbstractService {
@@ -150,5 +152,31 @@ public class TajoMasterService extends AbstractService {
context.getQueryJobManager().stopQuery(new QueryId(request));
done.run(BOOL_TRUE);
}
+
+ @Override
+ public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+ RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
+
+ TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
+ TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
+ List<WorkerResource> workerResources =
+ new ArrayList<WorkerResource>(context.getResourceManager().getWorkers().values());
+
+ for(WorkerResource worker: workerResources) {
+
+ TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
+ TajoMasterProtocol.WorkerResourceProto.newBuilder();
+
+ workerResource.setHost(worker.getAllocatedHost());
+ workerResource.setPeerRpcPort(worker.getPeerRpcPort());
+ workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+ workerResource.setMemoryMB(worker.getMemoryMB());
+ workerResource.setDiskSlots(worker.getDiskSlots());
+ workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+
+ builder.addWorkerResources(workerResource);
+ }
+ done.run(builder.build());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 54d2370..41b3b5e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -33,19 +33,25 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
@@ -163,6 +169,55 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
}
+ private void cleanup(QueryId queryId) {
+ LOG.info("cleanup query resources : " + queryId);
+ NettyClientBase rpc = null;
+ List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+
+ for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ try {
+ if (worker.getPeerRpcPort() == 0) continue;
+
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoWorkerProtocol.class, true);
+ TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
+
+ tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
+ } catch (Exception e) {
+ connPool.closeConnection(rpc);
+ rpc = null;
+ LOG.error(e.getMessage());
+ } finally {
+ connPool.releaseConnection(rpc);
+ }
+ }
+ }
+
+ public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+
+ NettyClientBase rpc = null;
+ try {
+ rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+
+ CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
+ new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+ masterService.getAllWorkerResource(callBack.getController(),
+ PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
+
+ TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+ return workerResourcesRequest.getWorkerResourcesList();
+ } catch (Exception e) {
+ connPool.closeConnection(rpc);
+ rpc = null;
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connPool.releaseConnection(rpc);
+ }
+ return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+ }
+
public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
NettyClientBase tmClient = null;
@@ -300,6 +355,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
try {
queryMasterTask.stop();
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
+ && !workerContext.isYarnContainerMode()) {
+ cleanup(queryId); // TODO We will support yarn mode
+ }
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 70bde5c..a75c200 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -47,6 +47,7 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.AbstractTaskScheduler;
import org.apache.tajo.master.TaskRunnerGroupEvent;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -478,6 +479,22 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
/**
+ * Getting the total memory of cluster
+ *
+ * @param subQuery
+ * @return mega bytes
+ */
+ private static int getClusterTotalMemory(SubQuery subQuery) {
+ List<TajoMasterProtocol.WorkerResourceProto> workers =
+ subQuery.context.getQueryMasterContext().getQueryMaster().getAllWorker();
+
+ int totalMem = 0;
+ for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ totalMem += worker.getMemoryMB();
+ }
+ return totalMem;
+ }
+ /**
* Getting the desire number of partitions according to the volume of input data.
* This method is only used to determine the partition key number of hash join or aggregation.
*
@@ -505,16 +522,23 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// for inner
ExecutionBlock inner = childs.get(1);
long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
- LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
- LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
+ LOG.info("Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+ + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
- long smaller = Math.min(outerVolume, innerVolume);
+ long bigger = Math.max(outerVolume, innerVolume);
- int mb = (int) Math.ceil((double)smaller / 1048576);
- LOG.info("Smaller Table's volume is approximately " + mb + " MB");
- // determine the number of task
- int taskNum = (int) Math.ceil((double)mb /
+ int mb = (int) Math.ceil((double) bigger / 1048576);
+ LOG.info("Bigger Table's volume is approximately " + mb + " MB");
+
+ int taskNum = (int) Math.ceil((double) mb /
conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
+
+ int totalMem = getClusterTotalMemory(subQuery);
+ LOG.info("Total memory of cluster is " + totalMem + " MB");
+ int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+
+ // determine the number of task
+ taskNum = Math.min(taskNum, slots);
LOG.info("The determined number of join partitions is " + taskNum);
return taskNum;
@@ -526,11 +550,17 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
} else {
long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
- int mb = (int) Math.ceil((double)volume / 1048576);
+ int mb = (int) Math.ceil((double) volume / 1048576);
LOG.info("Table's volume is approximately " + mb + " MB");
// determine the number of task
- int taskNum = (int) Math.ceil((double)mb /
+ int taskNumBySize = (int) Math.ceil((double) mb /
conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
+
+ int totalMem = getClusterTotalMemory(subQuery);
+
+ LOG.info("Total memory of cluster is " + totalMem + " MB");
+ int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+ int taskNum = Math.min(taskNumBySize, slots); //Maximum partitions
LOG.info("The determined number of aggregation partitions is " + taskNum);
return taskNum;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java
new file mode 100644
index 0000000..42ea71f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public class DeletionService {
+ static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+ private int debugDelay;
+ private ScheduledThreadPoolExecutor sched;
+ private final FileContext lfs = getLfs();
+
+ static final FileContext getLfs() {
+ try {
+ return FileContext.getLocalFSFileContext();
+ } catch (UnsupportedFileSystemException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public DeletionService(int defaultThreads, int debugDelay) {
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("DeletionService #%d").build();
+
+ sched = new ScheduledThreadPoolExecutor(defaultThreads, tf);
+ sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ sched.setKeepAliveTime(60L, TimeUnit.SECONDS);
+ this.debugDelay = debugDelay;
+ }
+
+
+ /**
+ * /**
+ * Delete the path(s) as this user.
+ *
+ * @param subDir the sub directory name
+ * @param baseDirs the base directories which contains the subDir's
+ */
+ public void delete(Path subDir, Path... baseDirs) {
+ if (debugDelay != -1) {
+ sched.schedule(new FileDeletion(subDir, baseDirs), debugDelay, TimeUnit.SECONDS);
+ }
+ }
+
+ public void stop() {
+ sched.shutdown();
+ boolean terminated = false;
+ try {
+ terminated = sched.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ if (!terminated) {
+ sched.shutdownNow();
+ }
+ }
+
+ private class FileDeletion implements Runnable {
+ final Path subDir;
+ final Path[] baseDirs;
+
+ FileDeletion(Path subDir, Path[] baseDirs) {
+ this.subDir = subDir;
+ this.baseDirs = baseDirs;
+ }
+
+ @Override
+ public void run() {
+
+ if (baseDirs == null || baseDirs.length == 0) {
+ LOG.debug("Worker deleting absolute path : " + subDir);
+ try {
+ lfs.delete(subDir, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete " + subDir);
+ }
+ return;
+ }
+ for (Path baseDir : baseDirs) {
+ Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
+ LOG.debug("Worker deleting path : " + del);
+ try {
+ lfs.delete(del, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete " + subDir);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 57d99c4..1f3445a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -22,7 +22,10 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.PathData;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -116,6 +119,8 @@ public class TajoWorker extends CompositeService {
private String[] cmdArgs;
+ private DeletionService deletionService;
+
public TajoWorker() throws Exception {
super(TajoWorker.class.getName());
}
@@ -225,6 +230,11 @@ public class TajoWorker extends CompositeService {
webServer.start();
httpPort = webServer.getPort();
LOG.info("Worker info server started:" + httpPort);
+
+ deletionService = new DeletionService(getMountPath().size(), 0);
+ if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
+ getWorkerContext().cleanupTemporalDirectories();
+ }
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
@@ -291,6 +301,8 @@ public class TajoWorker extends CompositeService {
} catch (Exception e) {
}
}
+
+ if(deletionService != null) deletionService.stop();
super.stop();
LOG.info("TajoWorker main thread exiting");
}
@@ -341,6 +353,46 @@ public class TajoWorker extends CompositeService {
}
}
+ protected void cleanup(String strPath) {
+ if(deletionService == null) return;
+
+ LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+ try {
+ Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf);
+ FileSystem localFS = FileSystem.getLocal(systemConf);
+ for (Path path : iter){
+ deletionService.delete(localFS.makeQualified(path));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ protected void cleanupTemporalDirectories() {
+ if(deletionService == null) return;
+
+ LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+ try {
+ Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
+ FileSystem localFS = FileSystem.getLocal(systemConf);
+ for (Path path : iter){
+ PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf);
+
+ ArrayList<Path> paths = new ArrayList<Path>();
+ for (PathData pd : items){
+ paths.add(pd.path);
+ }
+ if(paths.size() == 0) continue;
+
+ deletionService.delete(null, paths.toArray(new Path[paths.size()]));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
public boolean isYarnContainerMode() {
return yarnContainerMode;
}
@@ -628,7 +680,7 @@ public class TajoWorker extends CompositeService {
}
}
- public static List<File> getMountPath() throws Exception {
+ public static List<File> getMountPath() throws IOException {
BufferedReader mountOutput = null;
try {
Process mountProcess = Runtime.getRuntime ().exec("mount");
@@ -646,7 +698,7 @@ public class TajoWorker extends CompositeService {
mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
}
return mountPaths;
- } catch (Exception e) {
+ } catch (IOException e) {
e.printStackTrace();
throw e;
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 6be2172..d10b53d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.QueryId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -129,4 +130,11 @@ public class TajoWorkerManagerService extends CompositeService
done.run(TajoWorker.FALSE_PROTO);
}
}
+
+ @Override
+ public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ workerContext.cleanup(new QueryId(request).toString());
+ done.run(TajoWorker.TRUE_PROTO);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 70a998b..a93e870 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -204,7 +204,7 @@ public class Task {
if (request.getFetches().size() > 0) {
inputTableBaseDir = localFS.makeQualified(
lDirAllocator.getLocalPathForWrite(
- getTaskAttemptDir(context.getTaskId()).toString() + "/in", systemConf));
+ getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
localFS.mkdirs(inputTableBaseDir);
Path tableDir;
for (String inputTable : context.getInputTables()) {
@@ -505,7 +505,7 @@ public class Task {
if (fetches.size() > 0) {
Path inputDir = lDirAllocator.
getLocalPathToRead(
- getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", systemConf);
+ getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
File storeDir;
int i = 0;
@@ -615,6 +615,8 @@ public class Task {
public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
Path workDir =
StorageUtil.concatPath(
+ quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
+ "in",
quid.getQueryUnitId().getExecutionBlockId().toString(),
String.valueOf(quid.getQueryUnitId().getId()),
String.valueOf(quid.getId()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index dca200e..1280ab2 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -117,9 +117,12 @@ message WorkerResourceProto {
required string host = 1;
required int32 peerRpcPort = 2;
required int32 queryMasterPort = 3;
- required ExecutionBlockIdProto executionBlockId = 4;
required int32 memoryMB = 5 ;
- required int32 diskSlots = 6;
+ required float diskSlots = 6;
+}
+
+message WorkerResourcesRequest {
+ repeated WorkerResourceProto workerResources = 1;
}
message WorkerResourceReleaseRequest {
@@ -149,4 +152,5 @@ service TajoMasterProtocolService {
rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
+ rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 987af25..9cf90a5 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -159,6 +159,8 @@ service TajoWorkerProtocolService {
//from QueryMaster(Worker)
rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+
+ rpc cleanup(QueryIdProto) returns (BoolProto);
}
message EnforceProperty {