You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/12/06 03:14:49 UTC

git commit: TAJO-268: Temporal files should be removed after query is finished. (jinho) TAJO-292: Too many intermediate partition files. (jinho)

Updated Branches:
  refs/heads/master 39fe4d765 -> c18a3f862


TAJO-268: Temporal files should be removed after query is finished. (jinho)
TAJO-292: Too many intermediate partition files. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c18a3f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c18a3f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c18a3f86

Branch: refs/heads/master
Commit: c18a3f862cf764205abc8d8e491dbc6719a7c730
Parents: 39fe4d7
Author: jinossy <ji...@gmail.com>
Authored: Fri Dec 6 11:13:39 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Dec 6 11:13:39 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../apache/tajo/master/TajoMasterService.java   |  28 +++++
 .../tajo/master/querymaster/QueryMaster.java    |  59 ++++++++++
 .../tajo/master/querymaster/SubQuery.java       |  48 ++++++--
 .../org/apache/tajo/worker/DeletionService.java | 115 +++++++++++++++++++
 .../java/org/apache/tajo/worker/TajoWorker.java |  56 ++++++++-
 .../tajo/worker/TajoWorkerManagerService.java   |   8 ++
 .../main/java/org/apache/tajo/worker/Task.java  |   6 +-
 .../src/main/proto/TajoMasterProtocol.proto     |   8 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   2 +
 11 files changed, 320 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ca5b42..6293ec4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -101,6 +101,10 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-268: Temporal files should be removed after query is finished. (jinho)
+
+    TAJO-292: Too many intermediate partition files. (jinho)
+
     TAJO-375: TajoClient can't get result data when different os user
     (hyoungjunkim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1cbb3fc..0d9bbb0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -94,6 +94,7 @@ public class TajoConf extends YarnConfiguration {
 
     // Tajo Worker Temporal Directories
     WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir"),
+    WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false),
 
     // Tajo Worker Resources
     WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index c213dd5..aac4114 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -30,12 +30,14 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 
 public class TajoMasterService extends AbstractService {
@@ -150,5 +152,31 @@ public class TajoMasterService extends AbstractService {
       context.getQueryJobManager().stopQuery(new QueryId(request));
       done.run(BOOL_TRUE);
     }
+
+    @Override
+    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+                                     RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
+
+      TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
+          TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
+      List<WorkerResource> workerResources =
+          new ArrayList<WorkerResource>(context.getResourceManager().getWorkers().values());
+
+      for(WorkerResource worker: workerResources) {
+
+        TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
+            TajoMasterProtocol.WorkerResourceProto.newBuilder();
+
+        workerResource.setHost(worker.getAllocatedHost());
+        workerResource.setPeerRpcPort(worker.getPeerRpcPort());
+        workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+        workerResource.setMemoryMB(worker.getMemoryMB());
+        workerResource.setDiskSlots(worker.getDiskSlots());
+        workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+
+        builder.addWorkerResources(workerResource);
+      }
+      done.run(builder.build());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 54d2370..41b3b5e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -33,19 +33,25 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
@@ -163,6 +169,55 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
   }
 
+  private void cleanup(QueryId queryId) {
+    LOG.info("cleanup query resources : " + queryId);
+    NettyClientBase rpc = null;
+    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+
+    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+      try {
+        if (worker.getPeerRpcPort() == 0) continue;
+
+        rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+            TajoWorkerProtocol.class, true);
+        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
+
+        tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
+      } catch (Exception e) {
+        connPool.closeConnection(rpc);
+        rpc = null;
+        LOG.error(e.getMessage());
+      } finally {
+        connPool.releaseConnection(rpc);
+      }
+    }
+  }
+
+  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+
+    NettyClientBase rpc = null;
+    try {
+      rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+          TajoMasterProtocol.class, true);
+      TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+
+      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
+          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+      masterService.getAllWorkerResource(callBack.getController(),
+          PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
+
+      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+      return workerResourcesRequest.getWorkerResourcesList();
+    } catch (Exception e) {
+      connPool.closeConnection(rpc);
+      rpc = null;
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(rpc);
+    }
+    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+  }
+
   public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
     LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
     NettyClientBase tmClient = null;
@@ -300,6 +355,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
         try {
           queryMasterTask.stop();
+          if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
+              && !workerContext.isYarnContainerMode()) {
+            cleanup(queryId);       // TODO We will support yarn mode
+          }
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 70bde5c..a75c200 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -47,6 +47,7 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.AbstractTaskScheduler;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
@@ -478,6 +479,22 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     /**
+     * Getting the total memory of cluster
+     *
+     * @param subQuery
+     * @return mega bytes
+     */
+    private static int getClusterTotalMemory(SubQuery subQuery) {
+      List<TajoMasterProtocol.WorkerResourceProto> workers =
+          subQuery.context.getQueryMasterContext().getQueryMaster().getAllWorker();
+
+      int totalMem = 0;
+      for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+        totalMem += worker.getMemoryMB();
+      }
+      return totalMem;
+    }
+    /**
      * Getting the desire number of partitions according to the volume of input data.
      * This method is only used to determine the partition key number of hash join or aggregation.
      *
@@ -505,16 +522,23 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         // for inner
         ExecutionBlock inner = childs.get(1);
         long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
-        LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
-        LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
+        LOG.info("Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+            + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
 
-        long smaller = Math.min(outerVolume, innerVolume);
+        long bigger = Math.max(outerVolume, innerVolume);
 
-        int mb = (int) Math.ceil((double)smaller / 1048576);
-        LOG.info("Smaller Table's volume is approximately " + mb + " MB");
-        // determine the number of task
-        int taskNum = (int) Math.ceil((double)mb /
+        int mb = (int) Math.ceil((double) bigger / 1048576);
+        LOG.info("Bigger Table's volume is approximately " + mb + " MB");
+
+        int taskNum = (int) Math.ceil((double) mb /
             conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
+
+        int totalMem = getClusterTotalMemory(subQuery);
+        LOG.info("Total memory of cluster is " + totalMem + " MB");
+        int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+
+        // determine the number of task
+        taskNum = Math.min(taskNum, slots);
         LOG.info("The determined number of join partitions is " + taskNum);
         return taskNum;
 
@@ -526,11 +550,17 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         } else {
           long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
 
-          int mb = (int) Math.ceil((double)volume / 1048576);
+          int mb = (int) Math.ceil((double) volume / 1048576);
           LOG.info("Table's volume is approximately " + mb + " MB");
           // determine the number of task
-          int taskNum = (int) Math.ceil((double)mb /
+          int taskNumBySize = (int) Math.ceil((double) mb /
               conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
+
+          int totalMem = getClusterTotalMemory(subQuery);
+
+          LOG.info("Total memory of cluster is " + totalMem + " MB");
+          int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+          int taskNum = Math.min(taskNumBySize, slots); //Maximum partitions
           LOG.info("The determined number of aggregation partitions is " + taskNum);
           return taskNum;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java
new file mode 100644
index 0000000..42ea71f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/DeletionService.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public class DeletionService {
+  static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+  private int debugDelay;
+  private ScheduledThreadPoolExecutor sched;
+  private final FileContext lfs = getLfs();
+
+  static final FileContext getLfs() {
+    try {
+      return FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public DeletionService(int defaultThreads, int debugDelay) {
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("DeletionService #%d").build();
+
+    sched = new ScheduledThreadPoolExecutor(defaultThreads, tf);
+    sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    sched.setKeepAliveTime(60L, TimeUnit.SECONDS);
+    this.debugDelay = debugDelay;
+  }
+
+
+  /**
+   * /**
+   * Delete the path(s) as this user.
+   *
+   * @param subDir   the sub directory name
+   * @param baseDirs the base directories which contains the subDir's
+   */
+  public void delete(Path subDir, Path... baseDirs) {
+    if (debugDelay != -1) {
+      sched.schedule(new FileDeletion(subDir, baseDirs), debugDelay, TimeUnit.SECONDS);
+    }
+  }
+
+  public void stop() {
+    sched.shutdown();
+    boolean terminated = false;
+    try {
+      terminated = sched.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+    }
+    if (!terminated) {
+      sched.shutdownNow();
+    }
+  }
+
+  private class FileDeletion implements Runnable {
+    final Path subDir;
+    final Path[] baseDirs;
+
+    FileDeletion(Path subDir, Path[] baseDirs) {
+      this.subDir = subDir;
+      this.baseDirs = baseDirs;
+    }
+
+    @Override
+    public void run() {
+
+      if (baseDirs == null || baseDirs.length == 0) {
+        LOG.debug("Worker deleting absolute path : " + subDir);
+        try {
+          lfs.delete(subDir, true);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete " + subDir);
+        }
+        return;
+      }
+      for (Path baseDir : baseDirs) {
+        Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
+        LOG.debug("Worker deleting path : " + del);
+        try {
+          lfs.delete(del, true);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete " + subDir);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 57d99c4..1f3445a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -22,7 +22,10 @@ import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.PathData;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -116,6 +119,8 @@ public class TajoWorker extends CompositeService {
 
   private String[] cmdArgs;
 
+  private DeletionService deletionService;
+
   public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
   }
@@ -225,6 +230,11 @@ public class TajoWorker extends CompositeService {
           webServer.start();
           httpPort = webServer.getPort();
           LOG.info("Worker info server started:" + httpPort);
+
+          deletionService = new DeletionService(getMountPath().size(), 0);
+          if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
+            getWorkerContext().cleanupTemporalDirectories();
+          }
         } catch (IOException e) {
           LOG.error(e.getMessage(), e);
         }
@@ -291,6 +301,8 @@ public class TajoWorker extends CompositeService {
       } catch (Exception e) {
       }
     }
+
+    if(deletionService != null) deletionService.stop();
     super.stop();
     LOG.info("TajoWorker main thread exiting");
   }
@@ -341,6 +353,46 @@ public class TajoWorker extends CompositeService {
       }
     }
 
+    protected void cleanup(String strPath) {
+      if(deletionService == null) return;
+
+      LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+      try {
+        Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf);
+        FileSystem localFS = FileSystem.getLocal(systemConf);
+        for (Path path : iter){
+          deletionService.delete(localFS.makeQualified(path));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    protected void cleanupTemporalDirectories() {
+      if(deletionService == null) return;
+
+      LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+      try {
+        Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
+        FileSystem localFS = FileSystem.getLocal(systemConf);
+        for (Path path : iter){
+          PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf);
+
+          ArrayList<Path> paths = new ArrayList<Path>();
+          for (PathData pd : items){
+            paths.add(pd.path);
+          }
+          if(paths.size() == 0) continue;
+
+          deletionService.delete(null, paths.toArray(new Path[paths.size()]));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
     public boolean isYarnContainerMode() {
       return yarnContainerMode;
     }
@@ -628,7 +680,7 @@ public class TajoWorker extends CompositeService {
     }
   }
 
-  public static List<File> getMountPath() throws Exception {
+  public static List<File> getMountPath() throws IOException {
     BufferedReader mountOutput = null;
     try {
       Process mountProcess = Runtime.getRuntime ().exec("mount");
@@ -646,7 +698,7 @@ public class TajoWorker extends CompositeService {
         mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
       }
       return mountPaths;
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
       throw e;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 6be2172..d10b53d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -129,4 +130,11 @@ public class TajoWorkerManagerService extends CompositeService
       done.run(TajoWorker.FALSE_PROTO);
     }
   }
+
+  @Override
+  public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
+                      RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.cleanup(new QueryId(request).toString());
+    done.run(TajoWorker.TRUE_PROTO);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 70a998b..a93e870 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -204,7 +204,7 @@ public class Task {
     if (request.getFetches().size() > 0) {
       inputTableBaseDir = localFS.makeQualified(
           lDirAllocator.getLocalPathForWrite(
-              getTaskAttemptDir(context.getTaskId()).toString() + "/in", systemConf));
+              getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
       localFS.mkdirs(inputTableBaseDir);
       Path tableDir;
       for (String inputTable : context.getInputTables()) {
@@ -505,7 +505,7 @@ public class Task {
     if (fetches.size() > 0) {
       Path inputDir = lDirAllocator.
           getLocalPathToRead(
-              getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", systemConf);
+              getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
       File storeDir;
 
       int i = 0;
@@ -615,6 +615,8 @@ public class Task {
   public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
     Path workDir =
         StorageUtil.concatPath(
+            quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
+            "in",
             quid.getQueryUnitId().getExecutionBlockId().toString(),
             String.valueOf(quid.getQueryUnitId().getId()),
             String.valueOf(quid.getId()));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index dca200e..1280ab2 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -117,9 +117,12 @@ message WorkerResourceProto {
     required string host = 1;
     required int32 peerRpcPort = 2;
     required int32 queryMasterPort = 3;
-    required ExecutionBlockIdProto executionBlockId = 4;
     required int32 memoryMB = 5 ;
-    required int32 diskSlots = 6;
+    required float diskSlots = 6;
+}
+
+message WorkerResourcesRequest {
+    repeated WorkerResourceProto workerResources = 1;
 }
 
 message WorkerResourceReleaseRequest {
@@ -149,4 +152,5 @@ service TajoMasterProtocolService {
   rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
   rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
   rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
+  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c18a3f86/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 987af25..9cf90a5 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -159,6 +159,8 @@ service TajoWorkerProtocolService {
 
   //from QueryMaster(Worker)
   rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+
+  rpc cleanup(QueryIdProto) returns (BoolProto);
 }
 
 message EnforceProperty {