You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/21 03:34:57 UTC
tajo git commit: TAJO-1869: Incorrect result when sorting table with
small files.
Repository: tajo
Updated Branches:
refs/heads/master de1b7d42d -> 8f00bf743
TAJO-1869: Incorrect result when sorting table with small files.
Closes #769
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8f00bf74
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8f00bf74
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8f00bf74
Branch: refs/heads/master
Commit: 8f00bf7433e7dd99d843e9c97894bdabd81ccb50
Parents: de1b7d4
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Sep 21 10:34:06 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Sep 21 10:34:06 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../apache/tajo/engine/query/TestSortQuery.java | 19 ++-
.../dataset/TestSortQuery/table3/table1.tbl | 1 +
.../dataset/TestSortQuery/table3/table2.tbl | 1 +
.../dataset/TestSortQuery/table3/table3.tbl | 1 +
.../dataset/TestSortQuery/table3/table4.tbl | 1 +
.../dataset/TestSortQuery/table3/table5.tbl | 1 +
.../create_table_with_unique_small_dataset.sql | 1 +
.../queries/TestSortQuery/testOutOfScope.sql | 1 +
.../results/TestSortQuery/testOutOfScope.result | 7 ++
.../planner/physical/ExternalSortExec.java | 4 +-
.../tajo/worker/ExecutionBlockContext.java | 17 +--
.../java/org/apache/tajo/worker/Fetcher.java | 2 -
.../apache/tajo/worker/TaskAttemptContext.java | 5 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 111 +++++++----------
.../tajo/pullserver/TajoPullServerService.java | 122 +++++++++++--------
.../org/apache/tajo/rpc/RpcClientManager.java | 1 -
17 files changed, 159 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index eacfbe1..a2496c1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -273,6 +273,8 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1869: Incorrect result when sorting table with small files. (jinho)
+
TAJO-1846: Python temp directory path should be selected differently based
on user platform. (Contributed by Dongkyu Hwangbo, Committed by jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 9e632ab..6d6a44c 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -18,10 +18,7 @@
package org.apache.tajo.engine.query;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -374,4 +371,18 @@ public class TestSortQuery extends QueryTestCaseBase {
public final void testSubQuerySortAfterGroupMultiBlocks() throws Exception {
runSimpleTests();
}
+
+ @Test
+ public final void testOutOfScope() throws Exception {
+ executeDDL("create_table_with_unique_small_dataset.sql", "table3");
+ // table has 5 files
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "5");
+ try {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ } finally {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl
new file mode 100644
index 0000000..d9374de
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl
@@ -0,0 +1 @@
+A,1
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl
new file mode 100644
index 0000000..8c6802f
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl
@@ -0,0 +1 @@
+C,3
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl
new file mode 100644
index 0000000..8184c74
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl
@@ -0,0 +1 @@
+B,2
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl
new file mode 100644
index 0000000..4a5aa9c
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl
@@ -0,0 +1 @@
+D,4
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl
new file mode 100644
index 0000000..bfb50ee
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl
@@ -0,0 +1 @@
+E,5
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql
new file mode 100644
index 0000000..bf1fc0f
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql
@@ -0,0 +1 @@
+create external table testOutOfScope (col1 text, col2 int4) using text with ('text.delimiter'=',') location ${table.path};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql
new file mode 100644
index 0000000..3ea7cab
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql
@@ -0,0 +1 @@
+select col1, col2 from testOutOfScope order by col1 desc, col2 desc;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result b/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result
new file mode 100644
index 0000000..a07de56
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result
@@ -0,0 +1,7 @@
+col1,col2
+-------------------------------
+E,5
+D,4
+C,3
+B,2
+A,1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 4a9b491..42d99bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.planner.physical;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -410,7 +409,8 @@ public class ExternalSortExec extends SortExec {
if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
localFS.delete(frag.getPath(), true);
numDeletedFiles++;
- LOG.info("Delete merged intermediate file: " + frag);
+
+ if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + frag);
}
}
info(LOG, numDeletedFiles + " merged intermediate files deleted");
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 6f92344..94bf785 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -35,10 +35,10 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
@@ -213,21 +213,12 @@ public class ExecutionBlockContext {
}
public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
- Path workDir =
- StorageUtil.concatPath(
- executionBlockId.getQueryId().toString(),
- "output",
- String.valueOf(executionBlockId.getId()));
- return workDir;
+ return TajoPullServerService.getBaseOutputDir(
+ executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
}
public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
- Path workDir =
- StorageUtil.concatPath(
- executionBlockId.getQueryId().toString(),
- "in",
- executionBlockId.toString());
- return workDir;
+ return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
}
public ExecutionBlockId getExecutionBlockId() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 762278b..71d30cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -125,14 +125,12 @@ public class Fetcher {
public FileChunk get() throws IOException {
if (useLocalFile) {
- LOG.info("Get pseudo fetch from local host");
startTime = System.currentTimeMillis();
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FINISHED;
return fileChunk;
}
- LOG.info("Get real fetch from remote host");
this.startTime = System.currentTimeMillis();
this.state = TajoProtos.FetcherState.FETCH_FETCHING;
ChannelFuture future = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index a2f8c06..228c32a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -139,7 +139,10 @@ public class TaskAttemptContext {
public void setState(TaskAttemptState state) {
this.state = state;
- LOG.info("Query status of " + getTaskId() + " is changed to " + state);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Query status of " + getTaskId() + " is changed to " + state);
+ }
}
public void setDataChannel(DataChannel dataChannel) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 7b8d06f..4e3a8bf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -21,7 +21,6 @@ package org.apache.tajo.worker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,12 +44,15 @@ import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequest;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
-import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.logical.SortNode;
import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
@@ -569,7 +571,6 @@ public class TaskImpl implements Task {
if (name.equals(chunk.getEbId())) {
tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
listTablets.add(tablet);
- LOG.info("One local chunk is added to listTablets");
}
}
}
@@ -670,6 +671,7 @@ public class TaskImpl implements Task {
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
int i = 0;
+ int localStoreChunkCount = 0;
File storeDir;
File defaultStoreFile;
FileChunk storeChunk = null;
@@ -687,23 +689,18 @@ public class TaskImpl implements Task {
WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
- boolean hasError = false;
- try {
- LOG.info("Try to get local file chunk at local host");
- storeChunk = getLocalStoredFileChunk(uri, systemConf);
- } catch (Throwable t) {
- hasError = true;
- }
+
+ storeChunk = getLocalStoredFileChunk(uri, systemConf);
// When a range request is out of range, storeChunk will be NULL. This case is normal state.
// So, we should skip and don't need to create storeChunk.
- if (storeChunk == null && !hasError) {
+ if (storeChunk == null || storeChunk.length() == 0) {
continue;
}
- if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
- && hasError == false) {
+ if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) {
storeChunk.setFromRemote(false);
+ localStoreChunkCount++;
} else {
storeChunk = new FileChunk(defaultStoreFile, 0, -1);
storeChunk.setFromRemote(true);
@@ -717,12 +714,16 @@ public class TaskImpl implements Task {
// represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
storeChunk.setEbId(f.getName());
Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
- LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
runnerList.add(fetcher);
i++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+ }
}
}
ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+ LOG.info("Create shuffle Fetchers local:" + localStoreChunkCount +
+ ", remote:" + (runnerList.size() - localStoreChunkCount));
return runnerList;
} else {
return Lists.newArrayList();
@@ -731,56 +732,42 @@ public class TaskImpl implements Task {
private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
// Parse the URI
- LOG.info("getLocalStoredFileChunk starts");
- final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> taskIdList = params.get("ta");
- final List<String> stageIds = params.get("sid");
- final List<String> partIds = params.get("p");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- if (types == null || stageIds == null || qids == null || partIds == null) {
- LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
- return null;
- }
- if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
- LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
- return null;
- }
+ // Parsing the URL into key-values
+ final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());
- String queryId = qids.get(0);
- String shuffleType = types.get(0);
- String sid = stageIds.get(0);
- String partId = partIds.get(0);
+ String partId = params.get("p").get(0);
+ String queryId = params.get("qid").get(0);
+ String shuffleType = params.get("type").get(0);
+ String sid = params.get("sid").get(0);
- if (shuffleType.equals("r") && taskIdList == null) {
- LOG.error("Invalid URI - For range shuffle, taskId is required");
- return null;
- }
- List<String> taskIds = splitMaps(taskIdList);
+ final List<String> taskIdList = params.get("ta");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
- FileChunk chunk;
long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
- LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- + ", taskIds=" + taskIdList);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ + ", taskIds=" + taskIdList);
+ }
// The working directory of Tajo worker for each query, including stage
- String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+ Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
+ List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
+ FileChunk chunk;
// If the stage requires a range shuffle
if (shuffleType.equals("r")) {
- String ta = taskIds.get(0);
- if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
- LOG.warn("Range shuffle - file not exist");
+
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
+ LOG.warn("Range shuffle - file not exist. " + outputPath);
return null;
}
Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
boolean last = params.get("final") != null;
@@ -794,14 +781,15 @@ public class TaskImpl implements Task {
// If the stage requires a hash shuffle or a scattered hash shuffle
} else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
- String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
- if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
return null;
}
Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
File file = new File(path.toUri());
long startPos = (offset >= 0 && length >= 0) ? offset : 0;
long readLen = (offset >= 0 && length >= 0) ? length : file.length();
@@ -820,17 +808,6 @@ public class TaskImpl implements Task {
return chunk;
}
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
public static Path getTaskAttemptDir(TaskAttemptId quid) {
Path workDir =
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 59a758f..d6f5a90 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -59,11 +59,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyUtils;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.*;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
import java.io.*;
@@ -432,17 +429,6 @@ public class TajoPullServerService extends AbstractService {
lDirAlloc.getAllLocalPathsToRead(".", conf);
}
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
accepted.add(ctx.channel());
@@ -461,37 +447,30 @@ public class TajoPullServerService extends AbstractService {
ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
processingStatusMap.put(request.getUri().toString(), processingStatus);
- // Parsing the URL into key-values
- final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> taskIdList = params.get("ta");
- final List<String> subQueryIds = params.get("sid");
- final List<String> partIds = params.get("p");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
- if (types == null || subQueryIds == null || qids == null || partIds == null) {
- sendError(ctx, "Required queryId, type, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
- return;
+ // Parsing the URL into key-values
+ Map<String, List<String>> params = null;
+ try {
+ params = decodeParams(request.getUri());
+ } catch (Throwable e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
}
- if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
- sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
- return;
- }
+ String partId = params.get("p").get(0);
+ String queryId = params.get("qid").get(0);
+ String shuffleType = params.get("type").get(0);
+ String sid = params.get("sid").get(0);
- String partId = partIds.get(0);
- String queryId = qids.get(0);
- String shuffleType = types.get(0);
- String sid = subQueryIds.get(0);
+ final List<String> taskIdList = params.get("ta");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
List<String> taskIds = splitMaps(taskIdList);
- String queryBaseDir = queryId.toString() + "/output";
+ Path queryBaseDir = getBaseOutputDir(queryId, sid);
if (LOG.isDebugEnabled()) {
LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
@@ -505,15 +484,13 @@ public class TajoPullServerService extends AbstractService {
// if a stage requires a range shuffle
if (shuffleType.equals("r")) {
- String ta = taskIds.get(0);
- String pathString = queryBaseDir + "/" + sid + "/" + ta + "/output/";
- if (!lDirAlloc.ifExists(pathString, conf)) {
- LOG.warn(pathString + "does not exist.");
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
+ if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+ LOG.warn(outputPath + "does not exist.");
sendError(ctx, HttpResponseStatus.NO_CONTENT);
return;
}
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
- + "/output/", conf));
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
boolean last = params.get("final") != null;
@@ -533,14 +510,14 @@ public class TajoPullServerService extends AbstractService {
// if a stage requires a hash shuffle or a scattered hash shuffle
} else if (shuffleType.equals("h") || shuffleType.equals("s")) {
int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
- String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
- if (!lDirAlloc.ifExists(partPath, conf)) {
+ Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+ if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
LOG.warn("Partition shuffle file not exists: " + partPath);
sendError(ctx, HttpResponseStatus.NO_CONTENT);
return;
}
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
File file = new File(path.toUri());
long startPos = (offset >= 0 && length >= 0) ? offset : 0;
@@ -683,8 +660,9 @@ public class TajoPullServerService extends AbstractService {
Schema keySchema = idxReader.getKeySchema();
TupleComparator comparator = idxReader.getComparator();
- LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
- + idxReader.getLastKey());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
+ }
File data = new File(URI.create(outDir.toUri() + "/output"));
byte [] startBytes = Base64.decodeBase64(startKey);
@@ -776,7 +754,55 @@ public class TajoPullServerService extends AbstractService {
idxReader.close();
FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
- LOG.info("Retrieve File Chunk: " + chunk);
+
+ if(LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
return chunk;
}
+
+ public static List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ public static Map<String, List<String>> decodeParams(String uri) {
+ final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters();
+ final List<String> types = params.get("type");
+ final List<String> qids = params.get("qid");
+ final List<String> ebIds = params.get("sid");
+ final List<String> partIds = params.get("p");
+
+ if (types == null || ebIds == null || qids == null || partIds == null) {
+ throw new IllegalArgumentException("invalid params. required :" + params);
+ }
+
+ if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) {
+ throw new IllegalArgumentException("invalid params. required :" + params);
+ }
+
+ return params;
+ }
+
+ public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
+ Path workDir =
+ StorageUtil.concatPath(
+ queryId,
+ "output",
+ executionBlockSequenceId);
+ return workDir;
+ }
+
+ public static Path getBaseInputDir(String queryId, String executionBlockId) {
+ Path workDir =
+ StorageUtil.concatPath(
+ queryId,
+ "in",
+ executionBlockId);
+ return workDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 598c8e8..c801b8a 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -199,7 +199,6 @@ public class RpcClientManager {
* After it is shutdown it is not possible to reuse it again.
*/
public static void shutdown() {
- close();
NettyUtils.shutdownGracefully();
}