You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/27 01:52:14 UTC

[8/8] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into block_iteration

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into block_iteration

Conflicts:
	tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
	tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
	tajo-core/src/main/java/org/apache/tajo/worker/Task.java


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/38eb0255
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/38eb0255
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/38eb0255

Branch: refs/heads/block_iteration
Commit: 38eb0255fa775ad8e85d70d2e8568176d3dc6d24
Parents: 39f7129 d384cf0
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Sep 26 16:51:52 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Sep 26 16:51:52 2014 -0700

----------------------------------------------------------------------
 BUILDING                                        |   8 +-
 CHANGES                                         |  18 ++
 README                                          |   2 +-
 .../java/org/apache/tajo/client/TajoClient.java | 117 ++++++--
 .../org/apache/tajo/jdbc/FetchResultSet.java    |  91 ++++++
 .../org/apache/tajo/jdbc/TajoResultSetBase.java |   2 +-
 tajo-client/src/main/proto/ClientProtos.proto   |  29 +-
 .../main/proto/TajoMasterClientProtocol.proto   |   4 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 .../java/org/apache/tajo/benchmark/TPCH.java    |  32 ++-
 .../apache/tajo/engine/planner/PlannerUtil.java | 138 +++++++++-
 .../planner/physical/BSTIndexScanExec.java      |   4 +-
 .../planner/physical/ExternalSortExec.java      |  97 +++++--
 .../physical/RangeShuffleFileWriteExec.java     |   2 +-
 .../org/apache/tajo/master/GlobalEngine.java    |  31 ++-
 .../master/NonForwardQueryResultScanner.java    | 165 +++++++++++
 .../tajo/master/TajoMasterClientService.java    |  69 ++++-
 .../org/apache/tajo/master/session/Session.java |  54 ++++
 .../tajo/master/session/SessionManager.java     |  13 +-
 .../tajo/worker/ExecutionBlockContext.java      |   4 +-
 .../java/org/apache/tajo/worker/Fetcher.java    |  37 ++-
 .../main/java/org/apache/tajo/worker/Task.java  | 182 +++++++++++-
 .../apache/tajo/worker/TaskAttemptContext.java  |   2 -
 .../java/org/apache/tajo/worker/TaskRunner.java |  26 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  45 ++-
 .../tajo/engine/planner/TestPlannerUtil.java    |  62 +++++
 .../planner/physical/TestPhysicalPlanner.java   |   2 +-
 .../tajo/engine/query/TestNullValues.java       |  32 ++-
 .../tajo/engine/query/TestSelectQuery.java      |  15 +
 .../org/apache/tajo/jdbc/TestResultSet.java     |  10 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |  68 ++++-
 .../org/apache/tajo/worker/TestFetcher.java     |  25 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   4 +-
 .../queries/TestSelectQuery/customer_ddl.sql    |   9 +
 .../TestSelectQuery/insert_into_customer.sql    |  11 +
 ...testSimpleQueryWithLimitPartitionedTable.sql |   1 +
 ...tSimpleQueryWithLimitPartitionedTable.result |  12 +
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 tajo-dist/src/main/bin/tajo                     |   1 +
 .../functions/math_func_and_operators.rst       | 276 ++++++++++++++++++-
 tajo-project/pom.xml                            |   2 +-
 .../apache/tajo/rpc/RemoteCallException.java    |   3 +
 .../storage/HashShuffleAppenderManager.java     |   3 +-
 .../java/org/apache/tajo/storage/RawFile.java   |  71 +++--
 .../tajo/pullserver/PullServerAuxService.java   |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  |  15 +-
 .../tajo/pullserver/retriever/FileChunk.java    |  38 ++-
 48 files changed, 1645 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/CHANGES
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3a12039,c4cc254..29d6602
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@@ -31,11 -37,15 +37,15 @@@ import org.apache.tajo.conf.TajoConf
  import org.apache.tajo.engine.eval.*;
  import org.apache.tajo.engine.exception.InvalidQueryException;
  import org.apache.tajo.engine.planner.logical.*;
 -import org.apache.tajo.engine.utils.SchemaUtil;
 -import org.apache.tajo.storage.TupleComparator;
 +import org.apache.tajo.catalog.SchemaUtil;
 +import org.apache.tajo.storage.BaseTupleComparator;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.storage.fragment.FragmentConvertor;
  import org.apache.tajo.util.TUtil;
  
+ import java.io.IOException;
  import java.util.*;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  public class PlannerUtil {
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index e18448f,0094590..8d36e74
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@@ -44,10 -46,14 +44,11 @@@ import org.apache.tajo.util.FileUtil
  import org.apache.tajo.util.TUtil;
  import org.apache.tajo.worker.TaskAttemptContext;
  
+ import java.io.File;
  import java.io.IOException;
 -import java.util.*;
 +import java.util.List;
  import java.util.concurrent.*;
  
 -import static org.apache.tajo.storage.RawFile.RawFileAppender;
 -import static org.apache.tajo.storage.RawFile.RawFileScanner;
 -
  /**
   * This external sort algorithm can be characterized by the followings:
   *
@@@ -465,33 -512,17 +498,33 @@@ public class ExternalSortExec extends S
      return result;
    }
  
-   private Scanner getFileScanner(Path path) throws IOException {
-     String extension = FileUtil.getExtension(path.getName());
+   private Scanner getFileScanner(FileFragment frag) throws IOException {
 -    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag);
++    String extension = FileUtil.getExtension(frag.getPath().getName());
 +    if (extension.equals(RawFile.FILE_EXTENSION)) {
-       return new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
++      return new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag);
 +    } else if (extension.equalsIgnoreCase(DirectRawFileWriter.FILE_EXTENSION)) {
-       return new DirectRawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
++      return new DirectRawFileScanner(context.getConf(), plan.getInSchema(), meta, frag.getPath());
 +    } else {
-       throw new IllegalStateException("Unknown File Extension: " + path);
++      throw new IllegalStateException("Unknown File Extension: " + frag.getPath());
 +    }
    }
  
-   private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+   private Scanner createKWayMerger(List<FileFragment> inputs, final int startChunkId, final int num) throws IOException {
 -    final Scanner [] sources = new Scanner[num];
 +    boolean tupleInMemory = tupleBlock.rows() > 0;
 +
 +    List<Scanner> scannerList = Lists.newArrayList();
 +
 +    // if tuples are still the in-memory block, the in-memory tuples will be included in merge phase.
 +    if (tupleInMemory) {
 +      scannerList.add(new MemTableScanner(sortedTuples, bytesOfLatestChunk));
 +    }
 +
      for (int i = 0; i < num; i++) {
 -      sources[i] = getFileScanner(inputs.get(startChunkId + i));
 +      scannerList.add(getFileScanner(inputs.get(startChunkId + i)));
      }
 +    Scanner [] sources = scannerList.toArray(new Scanner[scannerList.size()]);
  
 -    return createKWayMergerInternal(sources, 0, num);
 +    return createKWayMergerInternal(sources, 0, sources.length);
    }
  
    private Scanner createKWayMergerInternal(final Scanner [] sources, final int startIdx, final int num)

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 786c726,68379d1..ec42109
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@@ -79,7 -79,7 +79,7 @@@ public class RangeShuffleFileWriteExec 
      FileSystem fs = new RawLocalFileSystem();
      fs.mkdirs(storeTablePath);
      this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
--        outSchema, new Path(storeTablePath, "output"));
++        outSchema, new Path(storeTablePath, "output" + "." + RawFile.FILE_EXTENSION));
      this.appender.enableStats();
      this.appender.init();
      this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index b00a1ba,a7eaaf8..5127e90
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@@ -48,12 -49,16 +49,17 @@@ import org.apache.tajo.engine.query.Que
  import org.apache.tajo.ipc.QueryMasterProtocol;
  import org.apache.tajo.ipc.TajoWorkerProtocol.*;
  import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+ import org.apache.tajo.pullserver.TajoPullServerService;
+ import org.apache.tajo.pullserver.retriever.FileChunk;
  import org.apache.tajo.rpc.NullCallback;
 +import org.apache.tajo.storage.BaseTupleComparator;
 +import org.apache.tajo.storage.RawFile;
+ import org.apache.tajo.storage.HashShuffleAppenderManager;
  import org.apache.tajo.storage.StorageUtil;
 -import org.apache.tajo.storage.TupleComparator;
  import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.util.NetUtils;
  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
  import org.jboss.netty.util.Timer;
  
  import java.io.File;
@@@ -680,19 -703,55 +706,55 @@@ public class Task 
        Timer timer = executionBlockContext.getRPCTimer();
        Path inputDir = executionBlockContext.getLocalDirAllocator().
            getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-       File storeDir;
  
        int i = 0;
-       File storeFile;
+       File storeDir;
+       File defaultStoreFile;
+       FileChunk storeChunk = null;
        List<Fetcher> runnerList = Lists.newArrayList();
+ 
        for (FetchImpl f : fetches) {
+         storeDir = new File(inputDir.toString(), f.getName());
+         if (!storeDir.exists()) {
+           storeDir.mkdirs();
+         }
+ 
          for (URI uri : f.getURIs()) {
-           storeDir = new File(inputDir.toString(), f.getName());
-           if (!storeDir.exists()) {
-             storeDir.mkdirs();
 -          defaultStoreFile = new File(storeDir, "in_" + i);
++          defaultStoreFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION);
+           InetAddress address = InetAddress.getByName(uri.getHost());
+ 
+           if (NetUtils.isLocalAddress(address)) {
+             boolean hasError = false;
+             try {
+               LOG.info("Try to get local file chunk at local host");
+               storeChunk = getLocalStoredFileChunk(uri, systemConf);
+             } catch (Throwable t) {
+               hasError = true;
+             }
+ 
+             // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+             // So, we should skip and don't need to create storeChunk.
+             if (storeChunk == null && !hasError) {
+               continue;
+             }
+ 
+             if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+                 && hasError == false) {
+               storeChunk.setFromRemote(false);
+             } else {
+               storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+               storeChunk.setFromRemote(true);
+             }
+           } else {
+             storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+             storeChunk.setFromRemote(true);
            }
-           storeFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION);
-           Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer);
+ 
+           // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+           // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+           storeChunk.setEbId(f.getName());
+           Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer);
+           LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
            runnerList.add(fetcher);
            i++;
          }
@@@ -704,6 -763,108 +766,108 @@@
      }
    }
  
+   private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+     // Parse the URI
+     LOG.info("getLocalStoredFileChunk starts");
+     final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+     final List<String> types = params.get("type");
+     final List<String> qids = params.get("qid");
+     final List<String> taskIdList = params.get("ta");
+     final List<String> subQueryIds = params.get("sid");
+     final List<String> partIds = params.get("p");
+     final List<String> offsetList = params.get("offset");
+     final List<String> lengthList = params.get("length");
+ 
+     if (types == null || subQueryIds == null || qids == null || partIds == null) {
+       LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id");
+       return null;
+     }
+ 
+     if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+       LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id");
+       return null;
+     }
+ 
+     String queryId = qids.get(0);
+     String shuffleType = types.get(0);
+     String sid = subQueryIds.get(0);
+     String partId = partIds.get(0);
+ 
+     if (shuffleType.equals("r") && taskIdList == null) {
+       LOG.error("Invalid URI - For range shuffle, taskId is required");
+       return null;
+     }
+     List<String> taskIds = splitMaps(taskIdList);
+ 
+     FileChunk chunk = null;
+     long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+     long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+ 
+     LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ 	+ ", taskIds=" + taskIdList);
+ 
+     // The working directory of Tajo worker for each query, including subquery
+     String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+ 
+     // If the subquery requires a range shuffle
+     if (shuffleType.equals("r")) {
+       String ta = taskIds.get(0);
+       if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+         LOG.warn("Range shuffle - file not exist");
+         return null;
+       }
+       Path path = executionBlockContext.getLocalFS().makeQualified(
+ 	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+       String startKey = params.get("start").get(0);
+       String endKey = params.get("end").get(0);
+       boolean last = params.get("final") != null;
+ 
+       try {
+         chunk = TajoPullServerService.getFileCunks(path, startKey, endKey, last);
+             } catch (Throwable t) {
+         LOG.error("getFileChunks() throws exception");
+         return null;
+       }
+ 
+       // If the subquery requires a hash shuffle or a scattered hash shuffle
+     } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+       int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
 -      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
++      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId + "." + RawFile.FILE_EXTENSION;
+       if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+         LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+         return null;
+       }
+       Path path = executionBlockContext.getLocalFS().makeQualified(
+         executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+       File file = new File(path.toUri());
+       long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+       long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+ 
+       if (startPos >= file.length()) {
+         LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+         return null;
+       }
+       chunk = new FileChunk(file, startPos, readLen);
+ 
+     } else {
+       LOG.error("Unknown shuffle type");
+       return null;
+     }
+ 
+     return chunk;
+   }
+ 
+   private List<String> splitMaps(List<String> mapq) {
+     if (null == mapq) {
+       return null;
+     }
+     final List<String> ret = new ArrayList<String>();
+     for (String s : mapq) {
+       Collections.addAll(ret, s.split(","));
+     }
+     return ret;
+   }
+ 
    public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
      Path workDir =
          StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 7116905,756dadc..30e2978
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@@ -31,9 -34,11 +34,11 @@@ import org.apache.tajo.engine.eval.*
  import org.apache.tajo.engine.function.builtin.SumInt;
  import org.apache.tajo.engine.parser.SQLAnalyzer;
  import org.apache.tajo.engine.planner.logical.*;
 +import org.apache.tajo.storage.BaseTupleComparator;
  import org.apache.tajo.storage.Tuple;
 -import org.apache.tajo.storage.TupleComparator;
  import org.apache.tajo.storage.VTuple;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.storage.fragment.FragmentConvertor;
  import org.apache.tajo.util.CommonTestingUtil;
  import org.apache.tajo.util.KeyValueSet;
  import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 0987a78,5d809f8..7a350ce
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@@ -1076,7 -1076,7 +1076,7 @@@ public class TestPhysicalPlanner 
      BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
          keySchema, comp);
      reader.open();
--    Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
++    Path outputPath = StorageUtil.concatPath(workDir, "output", "output" + "." + RawFile.FILE_EXTENSION);
      TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet());
      SeekableScanner scanner =
          StorageManagerFactory.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index ccd2cde,5f8efe7..fa3c43a
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@@ -182,7 -182,7 +182,7 @@@ public class TestRangeRetrieverHandler 
  
      TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
      SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema,
--        StorageUtil.concatPath(testDir, "output", "output"));
++        StorageUtil.concatPath(testDir, "output", "output" + "." + RawFile.FILE_EXTENSION));
  
      scanner.init();
      int cnt = 0;
@@@ -305,7 -305,7 +305,7 @@@
      reader.open();
      TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
      SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, outputMeta, schema,
--        StorageUtil.concatPath(testDir, "output", "output"));
++        StorageUtil.concatPath(testDir, "output", "output" + "." + RawFile.FILE_EXTENSION));
      scanner.init();
      int cnt = 0;
      while(scanner.next() != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-project/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index f0699b7,f0699b7..fdd20c9
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@@ -115,7 -115,7 +115,8 @@@ public class HashShuffleAppenderManage
        //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
  
        // If EB has many partition, too many shuffle file are in single directory.
--      return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId);
++      return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId +
++          "." + RawFile.FILE_EXTENSION);
      } catch (Exception e) {
        LOG.error(e.getMessage(), e);
        throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 7f729e1,edcf686..0f0dc5f
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@@ -31,8 -32,8 +32,9 @@@ import org.apache.tajo.common.TajoDataT
  import org.apache.tajo.datum.DatumFactory;
  import org.apache.tajo.datum.NullDatum;
  import org.apache.tajo.datum.ProtobufDatumFactory;
 +import org.apache.tajo.util.UnsafeUtil;
  import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.unit.StorageUnit;
  import org.apache.tajo.util.BitArray;
  
  import java.io.File;

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/38eb0255/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --cc tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 8cdfbb4,2fb7c29..e17769e
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@@ -47,11 -47,11 +47,8 @@@ import org.apache.tajo.conf.TajoConf
  import org.apache.tajo.conf.TajoConf.ConfVars;
  import org.apache.tajo.pullserver.retriever.FileChunk;
  import org.apache.tajo.rpc.RpcChannelFactory;
- import org.apache.tajo.storage.BaseTupleComparator;
--import org.apache.tajo.storage.HashShuffleAppenderManager;
--import org.apache.tajo.storage.RowStoreUtil;
++import org.apache.tajo.storage.*;
  import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
--import org.apache.tajo.storage.Tuple;
 -import org.apache.tajo.storage.TupleComparator;
  import org.apache.tajo.storage.index.bst.BSTIndex;
  import org.jboss.netty.bootstrap.ServerBootstrap;
  import org.jboss.netty.buffer.ChannelBuffers;
@@@ -712,7 -712,7 +709,7 @@@ public class TajoPullServerService exte
      LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
          + idxReader.getLastKey());
  
--    File data = new File(URI.create(outDir.toUri() + "/output"));
++    File data = new File(URI.create(outDir.toUri() + "/output" + "." + RawFile.FILE_EXTENSION));
      byte [] startBytes = Base64.decodeBase64(startKey);
      byte [] endBytes = Base64.decodeBase64(endKey);