You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/27 01:52:14 UTC
[8/8] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into block_iteration
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into block_iteration
Conflicts:
tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
tajo-core/src/main/java/org/apache/tajo/worker/Task.java
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/38eb0255
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/38eb0255
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/38eb0255
Branch: refs/heads/block_iteration
Commit: 38eb0255fa775ad8e85d70d2e8568176d3dc6d24
Parents: 39f7129 d384cf0
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Sep 26 16:51:52 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Sep 26 16:51:52 2014 -0700
----------------------------------------------------------------------
BUILDING | 8 +-
CHANGES | 18 ++
README | 2 +-
.../java/org/apache/tajo/client/TajoClient.java | 117 ++++++--
.../org/apache/tajo/jdbc/FetchResultSet.java | 91 ++++++
.../org/apache/tajo/jdbc/TajoResultSetBase.java | 2 +-
tajo-client/src/main/proto/ClientProtos.proto | 29 +-
.../main/proto/TajoMasterClientProtocol.proto | 4 +-
.../main/java/org/apache/tajo/SessionVars.java | 3 +
.../java/org/apache/tajo/conf/TajoConf.java | 5 +-
.../java/org/apache/tajo/benchmark/TPCH.java | 32 ++-
.../apache/tajo/engine/planner/PlannerUtil.java | 138 +++++++++-
.../planner/physical/BSTIndexScanExec.java | 4 +-
.../planner/physical/ExternalSortExec.java | 97 +++++--
.../physical/RangeShuffleFileWriteExec.java | 2 +-
.../org/apache/tajo/master/GlobalEngine.java | 31 ++-
.../master/NonForwardQueryResultScanner.java | 165 +++++++++++
.../tajo/master/TajoMasterClientService.java | 69 ++++-
.../org/apache/tajo/master/session/Session.java | 54 ++++
.../tajo/master/session/SessionManager.java | 13 +-
.../tajo/worker/ExecutionBlockContext.java | 4 +-
.../java/org/apache/tajo/worker/Fetcher.java | 37 ++-
.../main/java/org/apache/tajo/worker/Task.java | 182 +++++++++++-
.../apache/tajo/worker/TaskAttemptContext.java | 2 -
.../java/org/apache/tajo/worker/TaskRunner.java | 26 +-
.../org/apache/tajo/TajoTestingCluster.java | 45 ++-
.../tajo/engine/planner/TestPlannerUtil.java | 62 +++++
.../planner/physical/TestPhysicalPlanner.java | 2 +-
.../tajo/engine/query/TestNullValues.java | 32 ++-
.../tajo/engine/query/TestSelectQuery.java | 15 +
.../org/apache/tajo/jdbc/TestResultSet.java | 10 +-
.../apache/tajo/master/TestGlobalPlanner.java | 68 ++++-
.../org/apache/tajo/worker/TestFetcher.java | 25 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 4 +-
.../queries/TestSelectQuery/customer_ddl.sql | 9 +
.../TestSelectQuery/insert_into_customer.sql | 11 +
...testSimpleQueryWithLimitPartitionedTable.sql | 1 +
...tSimpleQueryWithLimitPartitionedTable.result | 12 +
.../TestTajoCli/testHelpSessionVars.result | 1 +
tajo-dist/src/main/bin/tajo | 1 +
.../functions/math_func_and_operators.rst | 276 ++++++++++++++++++-
tajo-project/pom.xml | 2 +-
.../apache/tajo/rpc/RemoteCallException.java | 3 +
.../storage/HashShuffleAppenderManager.java | 3 +-
.../java/org/apache/tajo/storage/RawFile.java | 71 +++--
.../tajo/pullserver/PullServerAuxService.java | 6 +-
.../tajo/pullserver/TajoPullServerService.java | 15 +-
.../tajo/pullserver/retriever/FileChunk.java | 38 ++-
48 files changed, 1645 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/CHANGES
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3a12039,c4cc254..29d6602
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@@ -31,11 -37,15 +37,15 @@@ import org.apache.tajo.conf.TajoConf
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.TUtil;
+ import java.io.IOException;
import java.util.*;
+ import java.util.concurrent.atomic.AtomicInteger;
public class PlannerUtil {
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index e18448f,0094590..8d36e74
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@@ -44,10 -46,14 +44,11 @@@ import org.apache.tajo.util.FileUtil
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
+ import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.List;
import java.util.concurrent.*;
-import static org.apache.tajo.storage.RawFile.RawFileAppender;
-import static org.apache.tajo.storage.RawFile.RawFileScanner;
-
/**
* This external sort algorithm can be characterized by the followings:
*
@@@ -465,33 -512,17 +498,33 @@@ public class ExternalSortExec extends S
return result;
}
- private Scanner getFileScanner(Path path) throws IOException {
- String extension = FileUtil.getExtension(path.getName());
+ private Scanner getFileScanner(FileFragment frag) throws IOException {
- return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag);
++ String extension = FileUtil.getExtension(frag.getPath().getName());
+ if (extension.equals(RawFile.FILE_EXTENSION)) {
- return new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
++ return new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag);
+ } else if (extension.equalsIgnoreCase(DirectRawFileWriter.FILE_EXTENSION)) {
- return new DirectRawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
++ return new DirectRawFileScanner(context.getConf(), plan.getInSchema(), meta, frag.getPath());
+ } else {
- throw new IllegalStateException("Unknown File Extension: " + path);
++ throw new IllegalStateException("Unknown File Extension: " + frag.getPath());
+ }
}
- private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+ private Scanner createKWayMerger(List<FileFragment> inputs, final int startChunkId, final int num) throws IOException {
- final Scanner [] sources = new Scanner[num];
+ boolean tupleInMemory = tupleBlock.rows() > 0;
+
+ List<Scanner> scannerList = Lists.newArrayList();
+
+ // if tuples are still the in-memory block, the in-memory tuples will be included in merge phase.
+ if (tupleInMemory) {
+ scannerList.add(new MemTableScanner(sortedTuples, bytesOfLatestChunk));
+ }
+
for (int i = 0; i < num; i++) {
- sources[i] = getFileScanner(inputs.get(startChunkId + i));
+ scannerList.add(getFileScanner(inputs.get(startChunkId + i)));
}
+ Scanner [] sources = scannerList.toArray(new Scanner[scannerList.size()]);
- return createKWayMergerInternal(sources, 0, num);
+ return createKWayMergerInternal(sources, 0, sources.length);
}
private Scanner createKWayMergerInternal(final Scanner [] sources, final int startIdx, final int num)
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 786c726,68379d1..ec42109
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@@ -79,7 -79,7 +79,7 @@@ public class RangeShuffleFileWriteExec
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
-- outSchema, new Path(storeTablePath, "output"));
++ outSchema, new Path(storeTablePath, "output" + "." + RawFile.FILE_EXTENSION));
this.appender.enableStats();
this.appender.init();
this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index b00a1ba,a7eaaf8..5127e90
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@@ -48,12 -49,16 +49,17 @@@ import org.apache.tajo.engine.query.Que
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+ import org.apache.tajo.pullserver.TajoPullServerService;
+ import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.RawFile;
+ import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.util.Timer;
import java.io.File;
@@@ -680,19 -703,55 +706,55 @@@ public class Task
Timer timer = executionBlockContext.getRPCTimer();
Path inputDir = executionBlockContext.getLocalDirAllocator().
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
- File storeDir;
int i = 0;
- File storeFile;
+ File storeDir;
+ File defaultStoreFile;
+ FileChunk storeChunk = null;
List<Fetcher> runnerList = Lists.newArrayList();
+
for (FetchImpl f : fetches) {
+ storeDir = new File(inputDir.toString(), f.getName());
+ if (!storeDir.exists()) {
+ storeDir.mkdirs();
+ }
+
for (URI uri : f.getURIs()) {
- storeDir = new File(inputDir.toString(), f.getName());
- if (!storeDir.exists()) {
- storeDir.mkdirs();
- defaultStoreFile = new File(storeDir, "in_" + i);
++ defaultStoreFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION);
+ InetAddress address = InetAddress.getByName(uri.getHost());
+
+ if (NetUtils.isLocalAddress(address)) {
+ boolean hasError = false;
+ try {
+ LOG.info("Try to get local file chunk at local host");
+ storeChunk = getLocalStoredFileChunk(uri, systemConf);
+ } catch (Throwable t) {
+ hasError = true;
+ }
+
+ // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+ // So, we should skip and don't need to create storeChunk.
+ if (storeChunk == null && !hasError) {
+ continue;
+ }
+
+ if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+ && hasError == false) {
+ storeChunk.setFromRemote(false);
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
+ }
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
}
- storeFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION);
- Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer);
+
+ // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+ // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+ storeChunk.setEbId(f.getName());
+ Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer);
+ LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
runnerList.add(fetcher);
i++;
}
@@@ -704,6 -763,108 +766,108 @@@
}
}
+ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+ // Parse the URI
+ LOG.info("getLocalStoredFileChunk starts");
+ final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+ final List<String> types = params.get("type");
+ final List<String> qids = params.get("qid");
+ final List<String> taskIdList = params.get("ta");
+ final List<String> subQueryIds = params.get("sid");
+ final List<String> partIds = params.get("p");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
+
+ if (types == null || subQueryIds == null || qids == null || partIds == null) {
+ LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id");
+ return null;
+ }
+
+ if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+ LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id");
+ return null;
+ }
+
+ String queryId = qids.get(0);
+ String shuffleType = types.get(0);
+ String sid = subQueryIds.get(0);
+ String partId = partIds.get(0);
+
+ if (shuffleType.equals("r") && taskIdList == null) {
+ LOG.error("Invalid URI - For range shuffle, taskId is required");
+ return null;
+ }
+ List<String> taskIds = splitMaps(taskIdList);
+
+ FileChunk chunk = null;
+ long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+ long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+ LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ + ", taskIds=" + taskIdList);
+
+ // The working directory of Tajo worker for each query, including subquery
+ String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+ // If the subquery requires a range shuffle
+ if (shuffleType.equals("r")) {
+ String ta = taskIds.get(0);
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+ LOG.warn("Range shuffle - file not exist");
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+ String startKey = params.get("start").get(0);
+ String endKey = params.get("end").get(0);
+ boolean last = params.get("final") != null;
+
+ try {
+ chunk = TajoPullServerService.getFileCunks(path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("getFileChunks() throws exception");
+ return null;
+ }
+
+ // If the subquery requires a hash shuffle or a scattered hash shuffle
+ } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
- String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
++ String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId + "." + RawFile.FILE_EXTENSION;
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+ LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+ return null;
+ }
+ chunk = new FileChunk(file, startPos, readLen);
+
+ } else {
+ LOG.error("Unknown shuffle type");
+ return null;
+ }
+
+ return chunk;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
Path workDir =
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 7116905,756dadc..30e2978
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@@ -31,9 -34,11 +34,11 @@@ import org.apache.tajo.engine.eval.*
import org.apache.tajo.engine.function.builtin.SumInt;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.AfterClass;
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 0987a78,5d809f8..7a350ce
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@@ -1076,7 -1076,7 +1076,7 @@@ public class TestPhysicalPlanner
BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
keySchema, comp);
reader.open();
-- Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
++ Path outputPath = StorageUtil.concatPath(workDir, "output", "output" + "." + RawFile.FILE_EXTENSION);
TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet());
SeekableScanner scanner =
StorageManagerFactory.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index ccd2cde,5f8efe7..fa3c43a
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@@ -182,7 -182,7 +182,7 @@@ public class TestRangeRetrieverHandler
TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema,
-- StorageUtil.concatPath(testDir, "output", "output"));
++ StorageUtil.concatPath(testDir, "output", "output" + "." + RawFile.FILE_EXTENSION));
scanner.init();
int cnt = 0;
@@@ -305,7 -305,7 +305,7 @@@
reader.open();
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, outputMeta, schema,
-- StorageUtil.concatPath(testDir, "output", "output"));
++ StorageUtil.concatPath(testDir, "output", "output" + "." + RawFile.FILE_EXTENSION));
scanner.init();
int cnt = 0;
while(scanner.next() != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-project/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index f0699b7,f0699b7..fdd20c9
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@@ -115,7 -115,7 +115,8 @@@ public class HashShuffleAppenderManage
//LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
// If EB has many partition, too many shuffle file are in single directory.
-- return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId);
++ return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId +
++ "." + RawFile.FILE_EXTENSION);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 7f729e1,edcf686..0f0dc5f
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@@ -31,8 -32,8 +32,9 @@@ import org.apache.tajo.common.TajoDataT
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.util.UnsafeUtil;
import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.BitArray;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --cc tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 8cdfbb4,2fb7c29..e17769e
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@@ -47,11 -47,11 +47,8 @@@ import org.apache.tajo.conf.TajoConf
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.RpcChannelFactory;
- import org.apache.tajo.storage.BaseTupleComparator;
--import org.apache.tajo.storage.HashShuffleAppenderManager;
--import org.apache.tajo.storage.RowStoreUtil;
++import org.apache.tajo.storage.*;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
--import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
@@@ -712,7 -712,7 +709,7 @@@ public class TajoPullServerService exte
LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+ idxReader.getLastKey());
-- File data = new File(URI.create(outDir.toUri() + "/output"));
++ File data = new File(URI.create(outDir.toUri() + "/output" + "." + RawFile.FILE_EXTENSION));
byte [] startBytes = Base64.decodeBase64(startKey);
byte [] endBytes = Base64.decodeBase64(endKey);