You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/21 03:34:57 UTC

tajo git commit: TAJO-1869: Incorrect result when sorting table with small files.

Repository: tajo
Updated Branches:
  refs/heads/master de1b7d42d -> 8f00bf743


TAJO-1869: Incorrect result when sorting table with small files.

Closes #769


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8f00bf74
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8f00bf74
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8f00bf74

Branch: refs/heads/master
Commit: 8f00bf7433e7dd99d843e9c97894bdabd81ccb50
Parents: de1b7d4
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Sep 21 10:34:06 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Sep 21 10:34:06 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/tajo/engine/query/TestSortQuery.java |  19 ++-
 .../dataset/TestSortQuery/table3/table1.tbl     |   1 +
 .../dataset/TestSortQuery/table3/table2.tbl     |   1 +
 .../dataset/TestSortQuery/table3/table3.tbl     |   1 +
 .../dataset/TestSortQuery/table3/table4.tbl     |   1 +
 .../dataset/TestSortQuery/table3/table5.tbl     |   1 +
 .../create_table_with_unique_small_dataset.sql  |   1 +
 .../queries/TestSortQuery/testOutOfScope.sql    |   1 +
 .../results/TestSortQuery/testOutOfScope.result |   7 ++
 .../planner/physical/ExternalSortExec.java      |   4 +-
 .../tajo/worker/ExecutionBlockContext.java      |  17 +--
 .../java/org/apache/tajo/worker/Fetcher.java    |   2 -
 .../apache/tajo/worker/TaskAttemptContext.java  |   5 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   | 111 +++++++----------
 .../tajo/pullserver/TajoPullServerService.java  | 122 +++++++++++--------
 .../org/apache/tajo/rpc/RpcClientManager.java   |   1 -
 17 files changed, 159 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index eacfbe1..a2496c1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -273,6 +273,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1869: Incorrect result when sorting table with small files. (jinho)
+
     TAJO-1846: Python temp directory path should be selected differently based 
     on user platform. (Contributed by Dongkyu Hwangbo, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 9e632ab..6d6a44c 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -18,10 +18,7 @@
 
 package org.apache.tajo.engine.query;
 
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -374,4 +371,18 @@ public class TestSortQuery extends QueryTestCaseBase {
   public final void testSubQuerySortAfterGroupMultiBlocks() throws Exception {
     runSimpleTests();
   }
+
+  @Test
+  public final void testOutOfScope() throws Exception {
+    executeDDL("create_table_with_unique_small_dataset.sql", "table3");
+    // table has 5 files
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "5");
+    try {
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl
new file mode 100644
index 0000000..d9374de
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl
@@ -0,0 +1 @@
+A,1

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl
new file mode 100644
index 0000000..8c6802f
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl
@@ -0,0 +1 @@
+C,3

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl
new file mode 100644
index 0000000..8184c74
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl
@@ -0,0 +1 @@
+B,2

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl
new file mode 100644
index 0000000..4a5aa9c
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl
@@ -0,0 +1 @@
+D,4

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl
new file mode 100644
index 0000000..bfb50ee
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl
@@ -0,0 +1 @@
+E,5

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql
new file mode 100644
index 0000000..bf1fc0f
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql
@@ -0,0 +1 @@
+create external table testOutOfScope (col1 text, col2 int4) using text with ('text.delimiter'=',') location ${table.path};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql
new file mode 100644
index 0000000..3ea7cab
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql
@@ -0,0 +1 @@
+select col1, col2 from testOutOfScope order by col1 desc, col2 desc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result b/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result
new file mode 100644
index 0000000..a07de56
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result
@@ -0,0 +1,7 @@
+col1,col2
+-------------------------------
+E,5
+D,4
+C,3
+B,2
+A,1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 4a9b491..42d99bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -410,7 +409,8 @@ public class ExternalSortExec extends SortExec {
         if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
           localFS.delete(frag.getPath(), true);
           numDeletedFiles++;
-          LOG.info("Delete merged intermediate file: " + frag);
+
+          if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + frag);
         }
       }
       info(LOG, numDeletedFiles + " merged intermediate files deleted");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 6f92344..94bf785 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -35,10 +35,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
 
@@ -213,21 +213,12 @@ public class ExecutionBlockContext {
   }
 
   public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            executionBlockId.getQueryId().toString(),
-            "output",
-            String.valueOf(executionBlockId.getId()));
-    return workDir;
+    return TajoPullServerService.getBaseOutputDir(
+        executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
   }
 
   public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            executionBlockId.getQueryId().toString(),
-            "in",
-            executionBlockId.toString());
-    return workDir;
+    return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
   }
 
   public ExecutionBlockId getExecutionBlockId() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 762278b..71d30cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -125,14 +125,12 @@ public class Fetcher {
 
   public FileChunk get() throws IOException {
     if (useLocalFile) {
-      LOG.info("Get pseudo fetch from local host");
       startTime = System.currentTimeMillis();
       finishTime = System.currentTimeMillis();
       state = TajoProtos.FetcherState.FETCH_FINISHED;
       return fileChunk;
     }
 
-    LOG.info("Get real fetch from remote host");
     this.startTime = System.currentTimeMillis();
     this.state = TajoProtos.FetcherState.FETCH_FETCHING;
     ChannelFuture future = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index a2f8c06..228c32a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -139,7 +139,10 @@ public class TaskAttemptContext {
   
   public void setState(TaskAttemptState state) {
     this.state = state;
-    LOG.info("Query status of " + getTaskId() + " is changed to " + state);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Query status of " + getTaskId() + " is changed to " + state);
+    }
   }
 
   public void setDataChannel(DataChannel dataChannel) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 7b8d06f..4e3a8bf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -21,7 +21,6 @@ package org.apache.tajo.worker;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import io.netty.handler.codec.http.QueryStringDecoder;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,12 +44,15 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
-import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
 import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
@@ -569,7 +571,6 @@ public class TaskImpl implements Task {
         if (name.equals(chunk.getEbId())) {
           tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
           listTablets.add(tablet);
-          LOG.info("One local chunk is added to listTablets");
         }
       }
     }
@@ -670,6 +671,7 @@ public class TaskImpl implements Task {
           getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
 
       int i = 0;
+      int localStoreChunkCount = 0;
       File storeDir;
       File defaultStoreFile;
       FileChunk storeChunk = null;
@@ -687,23 +689,18 @@ public class TaskImpl implements Task {
 
           WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
           if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-            boolean hasError = false;
-            try {
-              LOG.info("Try to get local file chunk at local host");
-              storeChunk = getLocalStoredFileChunk(uri, systemConf);
-            } catch (Throwable t) {
-              hasError = true;
-            }
+
+            storeChunk = getLocalStoredFileChunk(uri, systemConf);
 
             // When a range request is out of range, storeChunk will be NULL. This case is normal state.
             // So, we should skip and don't need to create storeChunk.
-            if (storeChunk == null && !hasError) {
+            if (storeChunk == null || storeChunk.length() == 0) {
               continue;
             }
 
-            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
-                && hasError == false) {
+            if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) {
               storeChunk.setFromRemote(false);
+              localStoreChunkCount++;
             } else {
               storeChunk = new FileChunk(defaultStoreFile, 0, -1);
               storeChunk.setFromRemote(true);
@@ -717,12 +714,16 @@ public class TaskImpl implements Task {
           // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
           storeChunk.setEbId(f.getName());
           Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
           runnerList.add(fetcher);
           i++;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+          }
         }
       }
       ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+      LOG.info("Create shuffle Fetchers local:" + localStoreChunkCount +
+          ", remote:" + (runnerList.size() - localStoreChunkCount));
       return runnerList;
     } else {
       return Lists.newArrayList();
@@ -731,56 +732,42 @@ public class TaskImpl implements Task {
 
   private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
     // Parse the URI
-    LOG.info("getLocalStoredFileChunk starts");
-    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
-    final List<String> types = params.get("type");
-    final List<String> qids = params.get("qid");
-    final List<String> taskIdList = params.get("ta");
-    final List<String> stageIds = params.get("sid");
-    final List<String> partIds = params.get("p");
-    final List<String> offsetList = params.get("offset");
-    final List<String> lengthList = params.get("length");
-
-    if (types == null || stageIds == null || qids == null || partIds == null) {
-      LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
-      return null;
-    }
 
-    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
-      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
-      return null;
-    }
+    // Parsing the URL into key-values
+    final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());
 
-    String queryId = qids.get(0);
-    String shuffleType = types.get(0);
-    String sid = stageIds.get(0);
-    String partId = partIds.get(0);
+    String partId = params.get("p").get(0);
+    String queryId = params.get("qid").get(0);
+    String shuffleType = params.get("type").get(0);
+    String sid =  params.get("sid").get(0);
 
-    if (shuffleType.equals("r") && taskIdList == null) {
-      LOG.error("Invalid URI - For range shuffle, taskId is required");
-      return null;
-    }
-    List<String> taskIds = splitMaps(taskIdList);
+    final List<String> taskIdList = params.get("ta");
+    final List<String> offsetList = params.get("offset");
+    final List<String> lengthList = params.get("length");
 
-    FileChunk chunk;
     long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
     long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
 
-    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-	+ ", taskIds=" + taskIdList);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+          + ", taskIds=" + taskIdList);
+    }
 
     // The working directory of Tajo worker for each query, including stage
-    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+    Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
+    List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
 
+    FileChunk chunk;
     // If the stage requires a range shuffle
     if (shuffleType.equals("r")) {
-      String ta = taskIds.get(0);
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
-        LOG.warn("Range shuffle - file not exist");
+
+      Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
+        LOG.warn("Range shuffle - file not exist. " + outputPath);
         return null;
       }
       Path path = executionBlockContext.getLocalFS().makeQualified(
-	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
       String startKey = params.get("start").get(0);
       String endKey = params.get("end").get(0);
       boolean last = params.get("final") != null;
@@ -794,14 +781,15 @@ public class TaskImpl implements Task {
 
       // If the stage requires a hash shuffle or a scattered hash shuffle
     } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
-      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+      Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
         LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
         return null;
       }
       Path path = executionBlockContext.getLocalFS().makeQualified(
-        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
       File file = new File(path.toUri());
       long startPos = (offset >= 0 && length >= 0) ? offset : 0;
       long readLen = (offset >= 0 && length >= 0) ? length : file.length();
@@ -820,17 +808,6 @@ public class TaskImpl implements Task {
     return chunk;
   }
 
-  private List<String> splitMaps(List<String> mapq) {
-    if (null == mapq) {
-      return null;
-    }
-    final List<String> ret = new ArrayList<String>();
-    for (String s : mapq) {
-      Collections.addAll(ret, s.split(","));
-    }
-    return ret;
-  }
-
   public static Path getTaskAttemptDir(TaskAttemptId quid) {
     Path workDir =
         StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 59a758f..d6f5a90 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -59,11 +59,8 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyUtils;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 
 import java.io.*;
@@ -432,17 +429,6 @@ public class TajoPullServerService extends AbstractService {
       lDirAlloc.getAllLocalPathsToRead(".", conf);
     }
 
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
       accepted.add(ctx.channel());
@@ -461,37 +447,30 @@ public class TajoPullServerService extends AbstractService {
 
       ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
       processingStatusMap.put(request.getUri().toString(), processingStatus);
-      // Parsing the URL into key-values
-      final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
-      final List<String> types = params.get("type");
-      final List<String> qids = params.get("qid");
-      final List<String> taskIdList = params.get("ta");
-      final List<String> subQueryIds = params.get("sid");
-      final List<String> partIds = params.get("p");
-      final List<String> offsetList = params.get("offset");
-      final List<String> lengthList = params.get("length");
 
-      if (types == null || subQueryIds == null || qids == null || partIds == null) {
-        sendError(ctx, "Required queryId, type, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
-        return;
+      // Parsing the URL into key-values
+      Map<String, List<String>> params = null;
+      try {
+        params = decodeParams(request.getUri());
+      } catch (Throwable e) {
+        sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
       }
 
-      if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
-        sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
-        return;
-      }
+      String partId = params.get("p").get(0);
+      String queryId = params.get("qid").get(0);
+      String shuffleType = params.get("type").get(0);
+      String sid =  params.get("sid").get(0);
 
-      String partId = partIds.get(0);
-      String queryId = qids.get(0);
-      String shuffleType = types.get(0);
-      String sid = subQueryIds.get(0);
+      final List<String> taskIdList = params.get("ta");
+      final List<String> offsetList = params.get("offset");
+      final List<String> lengthList = params.get("length");
 
       long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
       long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
 
       List<String> taskIds = splitMaps(taskIdList);
 
-      String queryBaseDir = queryId.toString() + "/output";
+      Path queryBaseDir = getBaseOutputDir(queryId, sid);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
@@ -505,15 +484,13 @@ public class TajoPullServerService extends AbstractService {
 
       // if a stage requires a range shuffle
       if (shuffleType.equals("r")) {
-        String ta = taskIds.get(0);
-        String pathString = queryBaseDir + "/" + sid + "/" + ta + "/output/";
-        if (!lDirAlloc.ifExists(pathString, conf)) {
-          LOG.warn(pathString + "does not exist.");
+        Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
+        if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+          LOG.warn(outputPath + "does not exist.");
           sendError(ctx, HttpResponseStatus.NO_CONTENT);
           return;
         }
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
-            + "/output/", conf));
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
         String startKey = params.get("start").get(0);
         String endKey = params.get("end").get(0);
         boolean last = params.get("final") != null;
@@ -533,14 +510,14 @@ public class TajoPullServerService extends AbstractService {
         // if a stage requires a hash shuffle or a scattered hash shuffle
       } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
         int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-        String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
-        if (!lDirAlloc.ifExists(partPath, conf)) {
+        Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+        if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
           LOG.warn("Partition shuffle file not exists: " + partPath);
           sendError(ctx, HttpResponseStatus.NO_CONTENT);
           return;
         }
 
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
 
         File file = new File(path.toUri());
         long startPos = (offset >= 0 && length >= 0) ? offset : 0;
@@ -683,8 +660,9 @@ public class TajoPullServerService extends AbstractService {
     Schema keySchema = idxReader.getKeySchema();
     TupleComparator comparator = idxReader.getComparator();
 
-    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
-        + idxReader.getLastKey());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
+    }
 
     File data = new File(URI.create(outDir.toUri() + "/output"));
     byte [] startBytes = Base64.decodeBase64(startKey);
@@ -776,7 +754,55 @@ public class TajoPullServerService extends AbstractService {
     idxReader.close();
 
     FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-    LOG.info("Retrieve File Chunk: " + chunk);
+
+    if(LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
     return chunk;
   }
+
+  public static List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<String>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  public static Map<String, List<String>> decodeParams(String uri) {
+    final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters();
+    final List<String> types = params.get("type");
+    final List<String> qids = params.get("qid");
+    final List<String> ebIds = params.get("sid");
+    final List<String> partIds = params.get("p");
+
+    if (types == null || ebIds == null || qids == null || partIds == null) {
+      throw new IllegalArgumentException("invalid params. required :" + params);
+    }
+
+    if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) {
+      throw new IllegalArgumentException("invalid params. required :" + params);
+    }
+
+    return params;
+  }
+
+  public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
+    Path workDir =
+        StorageUtil.concatPath(
+            queryId,
+            "output",
+            executionBlockSequenceId);
+    return workDir;
+  }
+
+  public static Path getBaseInputDir(String queryId, String executionBlockId) {
+    Path workDir =
+        StorageUtil.concatPath(
+            queryId,
+            "in",
+            executionBlockId);
+    return workDir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 598c8e8..c801b8a 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -199,7 +199,6 @@ public class RpcClientManager {
    * After it is shutdown it is not possible to reuse it again.
    */
   public static void shutdown() {
-    close();
     NettyUtils.shutdownGracefully();
   }