You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/27 01:52:11 UTC

[5/8] git commit: TAJO-983: Worker should directly read Intermediate data stored in localhost rather than fetching. (Mai Hai Thanh via hyunsik)

TAJO-983: Worker should directly read Intermediate data stored in localhost rather than fetching. (Mai Hai Thanh via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/644b7cd9
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/644b7cd9
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/644b7cd9

Branch: refs/heads/block_iteration
Commit: 644b7cd991ea402115c6dc1d198e7f1d7f41771b
Parents: 9e026a9
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Sep 24 01:08:43 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Sep 24 01:08:43 2014 -0700

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../engine/planner/UniformRangePartition.java   |   3 +-
 .../planner/physical/ExternalSortExec.java      |  92 +++++++---
 .../java/org/apache/tajo/worker/Fetcher.java    |  37 ++--
 .../main/java/org/apache/tajo/worker/Task.java  | 182 ++++++++++++++++++-
 .../org/apache/tajo/worker/TestFetcher.java     |  25 ++-
 .../java/org/apache/tajo/storage/RawFile.java   |  71 +++++---
 .../tajo/pullserver/PullServerAuxService.java   |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  |   8 +-
 .../tajo/pullserver/retriever/FileChunk.java    |  38 +++-
 10 files changed, 371 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7cbd524..6a64bc9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-983: Worker should directly read Intermediate data stored in localhost
+    rather than fetching. (Mai Hai Thanh via hyunsik)
+
     TAJO-910: Simple query (non-forwarded query) should be supported against 
     partition tables. (Hyoungjun Kim)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index db12285..551a9d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.primitives.UnsignedLong;
 import com.sun.tools.javac.util.Convert;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
@@ -571,7 +570,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             } else {
 
               if (isPureAscii[i]) {
-                lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue();
+                lastBigInt = new BigInteger(last.get(i).asByteArray());
                 if (sortSpecs[i].isAscending()) {
                   end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
                 } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 700e34d..0094590 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -45,6 +46,7 @@ import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
@@ -66,6 +68,8 @@ import static org.apache.tajo.storage.RawFile.RawFileScanner;
 public class ExternalSortExec extends SortExec {
   /** Class logger */
   private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
+  /** The prefix of fragment name for intermediate */
+  private static final String INTERMEDIATE_FILE_PREFIX = "@interFile_";
 
   private SortNode plan;
   private final TableMeta meta;
@@ -86,9 +90,9 @@ public class ExternalSortExec extends SortExec {
   /** local file system */
   private final RawLocalFileSystem localFS;
   /** final output files which are used for cleaning */
-  private List<Path> finalOutputFiles = null;
+  private List<FileFragment> finalOutputFiles = null;
   /** for directly merging sorted inputs */
-  private List<Path> mergedInputPaths = null;
+  private List<FileFragment> mergedInputFragments = null;
 
   ///////////////////////////////////////////////////
   // transient variables
@@ -129,10 +133,10 @@ public class ExternalSortExec extends SortExec {
                           final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
     this(context, sm, plan);
 
-    mergedInputPaths = TUtil.newList();
+    mergedInputFragments = TUtil.newList();
     for (CatalogProtos.FragmentProto proto : fragments) {
       FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
-      mergedInputPaths.add(fragment.getPath());
+      mergedInputFragments.add(fragment);
     }
   }
 
@@ -269,9 +273,9 @@ public class ExternalSortExec extends SortExec {
     if (!sorted) { // if not sorted, first sort all data
 
       // if input files are given, it starts merging directly.
-      if (mergedInputPaths != null) {
+      if (mergedInputFragments != null) {
         try {
-          this.result = externalMergeAndSort(mergedInputPaths);
+          this.result = externalMergeAndSort(mergedInputFragments);
         } catch (Exception e) {
           throw new PhysicalPlanningException(e);
         }
@@ -287,7 +291,14 @@ public class ExternalSortExec extends SortExec {
         } else { // if input data exceeds main-memory at least once
 
           try {
-            this.result = externalMergeAndSort(chunks);
+            List<FileFragment> fragments = TUtil.newList();
+            for (Path chunk : chunks) {
+              FileFragment frag = new FileFragment("", chunk, 0,
+                  new File(localFS.makeQualified(chunk).toUri()).length());
+              fragments.add(frag);
+            }
+
+            this.result = externalMergeAndSort(fragments);
           } catch (Exception e) {
             throw new PhysicalPlanningException(e);
           }
@@ -328,11 +339,11 @@ public class ExternalSortExec extends SortExec {
     return computedFanout;
   }
 
-  private Scanner externalMergeAndSort(List<Path> chunks)
+  private Scanner externalMergeAndSort(List<FileFragment> chunks)
       throws IOException, ExecutionException, InterruptedException {
     int level = 0;
-    final List<Path> inputFiles = TUtil.newList(chunks);
-    final List<Path> outputFiles = TUtil.newList();
+    final List<FileFragment> inputFiles = TUtil.newList(chunks);
+    final List<FileFragment> outputFiles = TUtil.newList();
     int remainRun = inputFiles.size();
     int chunksSize = chunks.size();
 
@@ -368,7 +379,7 @@ public class ExternalSortExec extends SortExec {
           info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns
               + ") and output files (" + outputFileNum + ") <= " + defaultFanout);
 
-          List<Path> switched = TUtil.newList();
+          List<FileFragment> switched = TUtil.newList();
           // switch the remain inputs to the next outputs
           for (int j = startIdx; j < inputFiles.size(); j++) {
             switched.add(inputFiles.get(j));
@@ -383,7 +394,7 @@ public class ExternalSortExec extends SortExec {
       // wait for all sort runners
       int finishedMerger = 0;
       int index = 0;
-      for (Future<Path> future : futures) {
+      for (Future<FileFragment> future : futures) {
         outputFiles.add(future.get());
         // Getting the number of merged files
         finishedMerger += numberOfMergingFiles.get(index++);
@@ -391,11 +402,32 @@ public class ExternalSortExec extends SortExec {
         progress = ((float)finishedMerger/(float)chunksSize) * 0.5f;
       }
 
-      // delete merged intermediate files
-      for (Path path : inputFiles) {
-        localFS.delete(path, true);
+      /*
+       * delete merged intermediate files
+       * 
+       * There may be 4 different types of file fragments in the list inputFiles
+       * + A: a fragment created from fetched data from a remote host. By default, this fragment represents
+       * a whole physical file (i.e., startOffset == 0 and length == length of physical file)
+       * + B1: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment
+       * represents the whole physical file (i.e., startOffset == 0 AND length == length of physical file)
+       * + B2: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment
+       * represents only a part of the physical file (i.e., startOffset > 0 OR length != length of physical file)
+       * + C: a fragment created from merging some fragments of the above types. When this fragment is created,
+       * its startOffset is set to 0 and its length is set to the length of the physical file, automatically
+       * 
+       * Fragments of types A, B1, and B2 are inputs of ExternalSortExec. Among them, only B2-type fragments will
+       * possibly be used by another task in the future. Thus, ideally, all fragments of types A, B1, and C can be
+       * deleted at this point. However, for the ease of future code maintenance, we delete only type-C fragments here
+       */
+      int numDeletedFiles = 0;
+      for (FileFragment frag : inputFiles) {
+        if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX) == true) {
+          localFS.delete(frag.getPath(), true);
+          numDeletedFiles++;
+          LOG.info("Delete merged intermediate file: " + frag);
+        }
       }
-      info(LOG, inputFiles.size() + " merged intermediate files deleted");
+      info(LOG, numDeletedFiles + " merged intermediate files deleted");
 
       // switch input files to output files, and then clear outputFiles
       inputFiles.clear();
@@ -418,15 +450,15 @@ public class ExternalSortExec extends SortExec {
   /**
    * Merge Thread
    */
-  private class KWayMergerCaller implements Callable<Path> {
+  private class KWayMergerCaller implements Callable<FileFragment> {
     final int level;
     final int nextRunId;
-    final List<Path> inputFiles;
+    final List<FileFragment> inputFiles;
     final int startIdx;
     final int mergeFanout;
     final boolean updateInputStats;
 
-    public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
+    public KWayMergerCaller(final int level, final int nextRunId, final List<FileFragment> inputFiles,
                             final int startIdx, final int mergeFanout, final boolean updateInputStats) {
       this.level = level;
       this.nextRunId = nextRunId;
@@ -437,7 +469,7 @@ public class ExternalSortExec extends SortExec {
     }
 
     @Override
-    public Path call() throws Exception {
+    public FileFragment call() throws Exception {
       final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
       info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName());
       long mergeStartTime = System.currentTimeMillis();
@@ -455,7 +487,9 @@ public class ExternalSortExec extends SortExec {
       info(LOG, outputPath.getName() + " is written to a disk. ("
           + FileUtil.humanReadableByteCount(output.getOffset(), false)
           + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
-      return outputPath;
+      File f = new File(localFS.makeQualified(outputPath).toUri());
+      FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + outputPath.getName(), outputPath, 0, f.length());
+      return frag;
     }
   }
 
@@ -469,7 +503,7 @@ public class ExternalSortExec extends SortExec {
   /**
    * Create a merged file scanner or k-way merge scanner.
    */
-  private Scanner createFinalMerger(List<Path> inputs) throws IOException {
+  private Scanner createFinalMerger(List<FileFragment> inputs) throws IOException {
     if (inputs.size() == 1) {
       this.result = getFileScanner(inputs.get(0));
     } else {
@@ -478,11 +512,11 @@ public class ExternalSortExec extends SortExec {
     return result;
   }
 
-  private Scanner getFileScanner(Path path) throws IOException {
-    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
+  private Scanner getFileScanner(FileFragment frag) throws IOException {
+    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag);
   }
 
-  private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+  private Scanner createKWayMerger(List<FileFragment> inputs, final int startChunkId, final int num) throws IOException {
     final Scanner [] sources = new Scanner[num];
     for (int i = 0; i < num; i++) {
       sources[i] = getFileScanner(inputs.get(startChunkId + i));
@@ -741,8 +775,12 @@ public class ExternalSortExec extends SortExec {
     }
 
     if (finalOutputFiles != null) {
-      for (Path path : finalOutputFiles) {
-        localFS.delete(path, true);
+      for (FileFragment frag : finalOutputFiles) {
+        File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri());
+        if (frag.getStartKey() == 0 && frag.getEndKey() == tmpFile.length()) {
+          localFS.delete(frag.getPath(), true);
+          LOG.info("Delete file: " + frag);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 4867fe4..742a025 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
@@ -51,11 +52,12 @@ public class Fetcher {
   private final static Log LOG = LogFactory.getLog(Fetcher.class);
 
   private final URI uri;
-  private final File file;
+  private final FileChunk fileChunk;
   private final TajoConf conf;
 
   private final String host;
   private int port;
+  private final boolean useLocalFile;
 
   private long startTime;
   private long finishTime;
@@ -66,9 +68,10 @@ public class Fetcher {
 
   private ClientBootstrap bootstrap;
 
-  public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory, Timer timer) {
+  public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, Timer timer) {
     this.uri = uri;
-    this.file = file;
+    this.fileChunk = chunk;
+    this.useLocalFile = !chunk.fromRemote();
     this.state = TajoProtos.FetcherState.FETCH_INIT;
     this.conf = conf;
     this.timer = timer;
@@ -84,13 +87,15 @@ public class Fetcher {
       }
     }
 
-    bootstrap = new ClientBootstrap(factory);
-    bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
-    bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
-    bootstrap.setOption("tcpNoDelay", true);
+    if (!useLocalFile) {
+      bootstrap = new ClientBootstrap(factory);
+      bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+      bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+      bootstrap.setOption("tcpNoDelay", true);
 
-    ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
-    bootstrap.setPipelineFactory(pipelineFactory);
+      ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile());
+      bootstrap.setPipelineFactory(pipelineFactory);
+    }
   }
 
   public long getStartTime() {
@@ -113,7 +118,16 @@ public class Fetcher {
     return messageReceiveCount;
   }
 
-  public File get() throws IOException {
+  public FileChunk get() throws IOException {
+    if (useLocalFile) {
+      LOG.info("Get pseudo fetch from local host");
+      startTime = System.currentTimeMillis();
+      finishTime = System.currentTimeMillis();
+      state = TajoProtos.FetcherState.FETCH_FINISHED;
+      return fileChunk;
+    }
+
+    LOG.info("Get real fetch from remote host");
     this.startTime = System.currentTimeMillis();
     this.state = TajoProtos.FetcherState.FETCH_FETCHING;
     ChannelFuture future = null;
@@ -145,7 +159,8 @@ public class Fetcher {
 
       channelFuture.addListener(ChannelFutureListener.CLOSE);
 
-      return file;
+      fileChunk.setLength(fileChunk.getFile().length());
+      return fileChunk;
     } finally {
       if(future != null){
         // Close the channel to exit.

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 66e0f87..a7eaaf8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,15 +49,21 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.jboss.netty.util.Timer;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
 import java.text.NumberFormat;
 import java.util.*;
@@ -91,6 +98,7 @@ public class Task {
   private long finishTime;
 
   private final TableStats inputStats;
+  private List<FileChunk> localChunks;
 
   // TODO - to be refactored
   private ShuffleType shuffleType = null;
@@ -190,6 +198,8 @@ public class Task {
       context.setOutputPath(outFilePath);
     }
 
+    this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+    
     context.setState(TaskAttemptState.TA_PENDING);
     LOG.info("==================================");
     LOG.info("* Subquery " + request.getId() + " is initialized");
@@ -586,6 +596,17 @@ public class Task {
       listTablets.add(tablet);
     }
 
+    // Special treatment for locally pseudo fetched chunks
+    synchronized (localChunks) {
+      for (FileChunk chunk : localChunks) {
+        if (name.equals(chunk.getEbId())) {
+          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
+          listTablets.add(tablet);
+          LOG.info("One local chunk is added to listTablets");
+        }
+      }
+    }
+
     FileFragment[] tablets = new FileFragment[listTablets.size()];
     listTablets.toArray(tablets);
 
@@ -620,8 +641,13 @@ public class Task {
             LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
           }
           try {
-            File fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) {
+            FileChunk fetched = fetcher.get();
+            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
+          && fetched.getFile() != null) {
+              if (fetched.fromRemote() == false) {
+          localChunks.add(fetched);
+          LOG.info("Add a new FileChunk to local chunk list");
+              }
               break;
             }
           } catch (Throwable e) {
@@ -677,19 +703,55 @@ public class Task {
       Timer timer = executionBlockContext.getRPCTimer();
       Path inputDir = executionBlockContext.getLocalDirAllocator().
           getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-      File storeDir;
 
       int i = 0;
-      File storeFile;
+      File storeDir;
+      File defaultStoreFile;
+      FileChunk storeChunk = null;
       List<Fetcher> runnerList = Lists.newArrayList();
+
       for (FetchImpl f : fetches) {
+        storeDir = new File(inputDir.toString(), f.getName());
+        if (!storeDir.exists()) {
+          storeDir.mkdirs();
+        }
+
         for (URI uri : f.getURIs()) {
-          storeDir = new File(inputDir.toString(), f.getName());
-          if (!storeDir.exists()) {
-            storeDir.mkdirs();
+          defaultStoreFile = new File(storeDir, "in_" + i);
+          InetAddress address = InetAddress.getByName(uri.getHost());
+
+          if (NetUtils.isLocalAddress(address)) {
+            boolean hasError = false;
+            try {
+              LOG.info("Try to get local file chunk at local host");
+              storeChunk = getLocalStoredFileChunk(uri, systemConf);
+            } catch (Throwable t) {
+              hasError = true;
+            }
+
+            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+            // So, we should skip and don't need to create storeChunk.
+            if (storeChunk == null && !hasError) {
+              continue;
+            }
+
+            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+                && hasError == false) {
+              storeChunk.setFromRemote(false);
+            } else {
+              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+              storeChunk.setFromRemote(true);
+            }
+          } else {
+            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+            storeChunk.setFromRemote(true);
           }
-          storeFile = new File(storeDir, "in_" + i);
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer);
+
+          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+          storeChunk.setEbId(f.getName());
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer);
+          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
           runnerList.add(fetcher);
           i++;
         }
@@ -701,6 +763,108 @@ public class Task {
     }
   }
 
+  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+    // Parse the URI
+    LOG.info("getLocalStoredFileChunk starts");
+    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+    final List<String> types = params.get("type");
+    final List<String> qids = params.get("qid");
+    final List<String> taskIdList = params.get("ta");
+    final List<String> subQueryIds = params.get("sid");
+    final List<String> partIds = params.get("p");
+    final List<String> offsetList = params.get("offset");
+    final List<String> lengthList = params.get("length");
+
+    if (types == null || subQueryIds == null || qids == null || partIds == null) {
+      LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id");
+      return null;
+    }
+
+    if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+      LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id");
+      return null;
+    }
+
+    String queryId = qids.get(0);
+    String shuffleType = types.get(0);
+    String sid = subQueryIds.get(0);
+    String partId = partIds.get(0);
+
+    if (shuffleType.equals("r") && taskIdList == null) {
+      LOG.error("Invalid URI - For range shuffle, taskId is required");
+      return null;
+    }
+    List<String> taskIds = splitMaps(taskIdList);
+
+    FileChunk chunk = null;
+    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+	+ ", taskIds=" + taskIdList);
+
+    // The working directory of Tajo worker for each query, including subquery
+    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+    // If the subquery requires a range shuffle
+    if (shuffleType.equals("r")) {
+      String ta = taskIds.get(0);
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+        LOG.warn("Range shuffle - file not exist");
+        return null;
+      }
+      Path path = executionBlockContext.getLocalFS().makeQualified(
+	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+      String startKey = params.get("start").get(0);
+      String endKey = params.get("end").get(0);
+      boolean last = params.get("final") != null;
+
+      try {
+        chunk = TajoPullServerService.getFileCunks(path, startKey, endKey, last);
+            } catch (Throwable t) {
+        LOG.error("getFileChunks() throws exception");
+        return null;
+      }
+
+      // If the subquery requires a hash shuffle or a scattered hash shuffle
+    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+        return null;
+      }
+      Path path = executionBlockContext.getLocalFS().makeQualified(
+        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+      File file = new File(path.toUri());
+      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+      if (startPos >= file.length()) {
+        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+        return null;
+      }
+      chunk = new FileChunk(file, startPos, readLen);
+
+    } else {
+      LOG.error("Unknown shuffle type");
+      return null;
+    }
+
+    return chunk;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<String>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
   public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
     Path workDir =
         StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index b15d523..551610b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -98,8 +99,12 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
-    assertNotNull(fetcher.get());
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+    FileChunk chunk = fetcher.get();
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
 
     FileSystem fs = FileSystem.getLocal(new TajoConf());
     FileStatus inStatus = fs.getFileStatus(inputPath);
@@ -140,7 +145,9 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -168,7 +175,9 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -200,7 +209,9 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -218,7 +229,9 @@ public class TestFetcher {
     String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+    storeChunk.setFromRemote(true);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     pullServerService.stop();

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 1f57675..edcf686 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage;
 
 import com.google.protobuf.Message;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +33,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.BitArray;
 
 import java.io.File;
@@ -47,36 +49,31 @@ public class RawFile {
   public static class RawFileScanner extends FileScanner implements SeekableScanner {
     private FileChannel channel;
     private DataType[] columnTypes;
-    private Path path;
 
     private ByteBuffer buffer;
+    private int bufferSize;
     private Tuple tuple;
 
-    private int headerSize = 0;
+    private int headerSize = 0; // Header size of a tuple
     private BitArray nullFlags;
     private static final int RECORD_SIZE = 4;
     private boolean eof = false;
-    private long fileSize;
+    private long fileLimit; // If this.fragment represents a complete file, this value is equal to the file's size
+    private long numBytesRead;
     private FileInputStream fis;
     private long recordCount;
 
-    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
-      super(conf, schema, meta, null);
-      this.path = path;
-    }
-
-    @SuppressWarnings("unused")
     public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
-      this(conf, schema, meta, fragment.getPath());
+      super(conf, schema, meta, fragment);
     }
 
     public void init() throws IOException {
       File file;
       try {
-        if (path.toUri().getScheme() != null) {
-          file = new File(path.toUri());
+        if (fragment.getPath().toUri().getScheme() != null) {
+          file = new File(fragment.getPath().toUri());
         } else {
-          file = new File(path.toString());
+          file = new File(fragment.getPath().toString());
         }
       } catch (IllegalArgumentException iae) {
         throw new IOException(iae);
@@ -84,16 +81,22 @@ public class RawFile {
 
       fis = new FileInputStream(file);
       channel = fis.getChannel();
-      fileSize = channel.size();
+      fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize
 
       if (tableStats != null) {
-        tableStats.setNumBytes(fileSize);
+        tableStats.setNumBytes(fragment.getEndKey());
       }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
+        LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size()
+            + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit);
       }
 
-      buffer = ByteBuffer.allocateDirect(64 * 1024);
+      if (fragment.getEndKey() < 64 * StorageUnit.KB) {
+	      bufferSize = fragment.getEndKey().intValue();
+      } else {
+	      bufferSize = 64 * StorageUnit.KB;
+      }
+      buffer = ByteBuffer.allocateDirect(bufferSize);
 
       columnTypes = new DataType[schema.size()];
       for (int i = 0; i < schema.size(); i++) {
@@ -101,14 +104,14 @@ public class RawFile {
       }
 
       tuple = new VTuple(columnTypes.length);
+      nullFlags = new BitArray(schema.size());
+      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
 
       // initial read
-      channel.read(buffer);
+      channel.position(fragment.getStartKey());
+      numBytesRead = channel.read(buffer);
       buffer.flip();
 
-      nullFlags = new BitArray(schema.size());
-      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
       super.init();
     }
 
@@ -125,19 +128,31 @@ public class RawFile {
       } else {
         buffer.clear();
         channel.position(offset);
-        channel.read(buffer);
+        int bytesRead = channel.read(buffer);
+        numBytesRead = bytesRead;
         buffer.flip();
         eof = false;
       }
     }
 
     private boolean fillBuffer() throws IOException {
+      if (numBytesRead >= fragment.getEndKey()) {
+        eof = true;
+        return false;
+      }
+      int currentDataSize = buffer.remaining();
       buffer.compact();
-      if (channel.read(buffer) == -1) {
+      int bytesRead = channel.read(buffer);
+      if (bytesRead == -1) {
         eof = true;
         return false;
       } else {
         buffer.flip();
+        long realRemaining = fragment.getEndKey() - numBytesRead;
+        numBytesRead += bytesRead;
+        if (realRemaining < bufferSize) {
+          buffer.limit(currentDataSize + (int) realRemaining);
+        }
         return true;
       }
     }
@@ -356,7 +371,7 @@ public class RawFile {
         }
       }
 
-      if(!buffer.hasRemaining() && channel.position() == fileSize){
+      if(!buffer.hasRemaining() && channel.position() == fileLimit){
         eof = true;
       }
       return new VTuple(tuple);
@@ -368,7 +383,7 @@ public class RawFile {
       buffer.clear();
       // reload initial buffer
       channel.position(0);
-      channel.read(buffer);
+      numBytesRead = channel.read(buffer);
       buffer.flip();
       eof = false;
     }
@@ -376,7 +391,7 @@ public class RawFile {
     @Override
     public void close() throws IOException {
       if (tableStats != null) {
-        tableStats.setReadBytes(fileSize);
+        tableStats.setReadBytes(fragment.getEndKey());
         tableStats.setNumRows(recordCount);
       }
 
@@ -410,14 +425,14 @@ public class RawFile {
         }
 
         if(eof || channel == null) {
-          tableStats.setReadBytes(fileSize);
+          tableStats.setReadBytes(fragment.getEndKey());
           return 1.0f;
         }
 
         if (filePos == 0) {
           return 0.0f;
         } else {
-          return Math.min(1.0f, ((float)filePos / (float)fileSize));
+          return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue()));
         }
       } catch (IOException e) {
         LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index 5f9f9e8..1c63c8a 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -488,20 +488,20 @@ public class PullServerAuxService extends AuxiliaryService {
       ChannelFuture writeFuture;
       if (ch.getPipeline().get(SslHandler.class) == null) {
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
-            file.startOffset, file.length(), manageOsCache, readaheadLength,
+            file.startOffset(), file.length(), manageOsCache, readaheadLength,
             readaheadPool, file.getFile().getAbsolutePath());
         writeFuture = ch.write(partition);
         writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
       } else {
         // HTTPS cannot be done with zero copy.
         final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            file.startOffset, file.length, sslFileBufferSize,
+            file.startOffset(), file.length(), sslFileBufferSize,
             manageOsCache, readaheadLength, readaheadPool,
             file.getFile().getAbsolutePath());
         writeFuture = ch.write(chunk);
       }
       metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(file.length); // optimistic
+      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
       return writeFuture;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 3fa67ae..2fb7c29 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -644,14 +644,14 @@ public class TajoPullServerService extends AbstractService {
         spill = new RandomAccessFile(file.getFile(), "r");
         if (ch.getPipeline().get(SslHandler.class) == null) {
           final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
-              file.startOffset, file.length(), manageOsCache, readaheadLength,
+              file.startOffset(), file.length(), manageOsCache, readaheadLength,
               readaheadPool, file.getFile().getAbsolutePath());
           writeFuture = ch.write(filePart);
           writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
         } else {
           // HTTPS cannot be done with zero copy.
           final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-              file.startOffset, file.length, sslFileBufferSize,
+              file.startOffset(), file.length(), sslFileBufferSize,
               manageOsCache, readaheadLength, readaheadPool,
               file.getFile().getAbsolutePath());
           writeFuture = ch.write(chunk);
@@ -667,7 +667,7 @@ public class TajoPullServerService extends AbstractService {
         return null;
       }
       metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(file.length); // optimistic
+      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
       return writeFuture;
     }
 
@@ -698,7 +698,7 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-  public FileChunk getFileCunks(Path outDir,
+  public static FileChunk getFileCunks(Path outDir,
                                       String startKey,
                                       String endKey,
                                       boolean last) throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index a8b424e..67cff21 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -23,8 +23,18 @@ import java.io.FileNotFoundException;
 
 public class FileChunk {
   private final File file;
-  public final long startOffset;
-  public final long length;
+  private final long startOffset;
+  private long length;
+
+  /**
+   * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise.
+   */
+  private boolean fromRemote;
+
+  /**
+   * ExecutionBlockId
+   */
+  private String ebId;
 
   public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
     this.file = file;
@@ -44,8 +54,28 @@ public class FileChunk {
     return this.length;
   }
 
+  public void setLength(long newLength) {
+    this.length = newLength;
+  }
+
+  public boolean fromRemote() {
+    return this.fromRemote;
+  }
+
+  public void setFromRemote(boolean newVal) {
+    this.fromRemote = newVal;
+  }
+
+  public String getEbId() {
+    return this.ebId;
+  }
+
+  public void setEbId(String newVal) {
+    this.ebId = newVal;
+  }
+
   public String toString() {
-    return " (start=" + startOffset() + ", length=" + length + ") "
-        + file.getAbsolutePath();
+    return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
+	+ file.getAbsolutePath();
   }
 }