You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:35 UTC

[32/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
new file mode 100644
index 0000000..c422b49
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.storage.RawFile.RawFileAppender;
+import static org.apache.tajo.storage.RawFile.RawFileScanner;
+
+/**
+ * This external sort algorithm can be characterized by the followings:
+ *
+ * <ul>
+ *   <li>in-memory sort if input data size fits a sort buffer</li>
+ *   <li>k-way merge sort if input data size exceeds the size of sort buffer</li>
+ *   <li>parallel merge</li>
+ *   <li>final merge avoidance</li>
+ *   <li>Unbalance merge if needed</li>
+ * </ul>
+ */
+public class ExternalSortExec extends SortExec {
+  /** Class logger */
+  private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
+
+  private SortNode plan;
+  private final TableMeta meta;
+  /** the defaultFanout of external sort */
+  private final int defaultFanout;
+  /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
+  private long sortBufferBytesNum;
+  /** the number of available cores */
+  private final int allocatedCoreNum;
+  /** If there are available multiple cores, it tries parallel merge. */
+  private ExecutorService executorService;
+  /** used for in-memory sort of each chunk. */
+  private List<Tuple> inMemoryTable;
+  /** temporal dir */
+  private final Path sortTmpDir;
+  /** It enables round-robin disks allocation */
+  private final LocalDirAllocator localDirAllocator;
+  /** local file system */
+  private final RawLocalFileSystem localFS;
+  /** final output files which are used for cleaning */
+  private List<Path> finalOutputFiles = null;
+  /** for directly merging sorted inputs */
+  private List<Path> mergedInputPaths = null;
+
+  ///////////////////////////////////////////////////
+  // transient variables
+  ///////////////////////////////////////////////////
+  /** already sorted or not */
+  private boolean sorted = false;
+  /** a flag to point whether sorted data resides in memory or not */
+  private boolean memoryResident = true;
+  /** the final result */
+  private Scanner result;
+  /** total bytes of input data */
+  private long sortAndStoredBytes;
+
+  private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
+      throws PhysicalPlanningException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
+
+    this.plan = plan;
+    this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
+
+    this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
+    if (defaultFanout < 2) {
+      throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2");
+    }
+    // TODO - sort buffer and core num should be changed to use the allocated container resource.
+    this.sortBufferBytesNum = context.getConf().getLongVar(ConfVars.EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE) * 1048576L;
+    this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
+    this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
+    this.inMemoryTable = new ArrayList<Tuple>(100000);
+
+    this.sortTmpDir = getExecutorTmpDir();
+    localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    localFS = new RawLocalFileSystem();
+  }
+
+  public ExternalSortExec(final TaskAttemptContext context,
+                          final AbstractStorageManager sm, final SortNode plan,
+                          final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
+    this(context, sm, plan);
+
+    mergedInputPaths = TUtil.newList();
+    for (CatalogProtos.FragmentProto proto : fragments) {
+      FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+      mergedInputPaths.add(fragment.getPath());
+    }
+  }
+
+  public ExternalSortExec(final TaskAttemptContext context,
+                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+      throws IOException {
+    this(context, sm, plan);
+    setChild(child);
+  }
+
+  @VisibleForTesting
+  public void setSortBufferBytesNum(int sortBufferBytesNum) {
+    this.sortBufferBytesNum = sortBufferBytesNum;
+  }
+
+  public void init() throws IOException {
+    inputStats = new TableStats();
+    super.init();
+  }
+
+  public SortNode getPlan() {
+    return this.plan;
+  }
+
+  /**
+   * Sort a tuple block and store them into a chunk file
+   */
+  private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+      throws IOException {
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
+    int rowNum = tupleBlock.size();
+
+    long sortStart = System.currentTimeMillis();
+    Collections.sort(tupleBlock, getComparator());
+    long sortEnd = System.currentTimeMillis();
+
+    long chunkWriteStart = System.currentTimeMillis();
+    Path outputPath = getChunkPathForWrite(0, chunkId);
+    final RawFileAppender appender = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+    appender.init();
+    for (Tuple t : tupleBlock) {
+      appender.addTuple(t);
+    }
+    appender.close();
+    tupleBlock.clear();
+    long chunkWriteEnd = System.currentTimeMillis();
+
+
+    info(LOG, "Chunk #" + chunkId + " sort and written (" +
+        FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " +
+        ", sort time: " + (sortEnd - sortStart) + " msec, " +
+        "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)");
+    return outputPath;
+  }
+
+  /**
+   * It divides all tuples into a number of chunks, then sort for each chunk.
+   *
+   * @return All paths of chunks
+   * @throws java.io.IOException
+   */
+  private List<Path> sortAndStoreAllChunks() throws IOException {
+    Tuple tuple;
+    long memoryConsumption = 0;
+    List<Path> chunkPaths = TUtil.newList();
+
+    int chunkId = 0;
+    long runStartTime = System.currentTimeMillis();
+    while ((tuple = child.next()) != null) { // partition sort start
+      Tuple vtuple = new VTuple(tuple);
+      inMemoryTable.add(vtuple);
+      memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);
+
+      if (memoryConsumption > sortBufferBytesNum) {
+        long runEndTime = System.currentTimeMillis();
+        info(LOG, chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec");
+        runStartTime = runEndTime;
+
+        info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " bytes");
+        memoryResident = false;
+
+        chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+
+        memoryConsumption = 0;
+        chunkId++;
+
+        // When the volume of sorting data once exceed the size of sort buffer,
+        // the total progress of this external sort is divided into two parts.
+        // In contrast, if the data fits in memory, the progress is only one part.
+        //
+        // When the progress is divided into two parts, the first part sorts tuples on memory and stores them
+        // into a chunk. The second part merges stored chunks into fewer chunks, and it continues until the number
+        // of merged chunks is fewer than the default fanout.
+        //
+        // The fact that the code reach here means that the first chunk has been just stored.
+        // That is, the progress was divided into two parts.
+        // So, it multiply the progress of the children operator and 0.5f.
+        progress = child.getProgress() * 0.5f;
+      }
+    }
+
+    if (inMemoryTable.size() > 0) { // if there are at least one or more input tuples
+      if (!memoryResident) { // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
+        if (inMemoryTable.size() > 0) {
+          long start = System.currentTimeMillis();
+          int rowNum = inMemoryTable.size();
+          chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+          long end = System.currentTimeMillis();
+          info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
+        }
+      } else { // this case means that all data does not exceed a sort buffer
+        Collections.sort(inMemoryTable, getComparator());
+      }
+    }
+
+    // get total loaded (or stored) bytes and total row numbers
+    TableStats childTableStats = child.getInputStats();
+    if (childTableStats != null) {
+      sortAndStoredBytes = childTableStats.getNumBytes();
+    }
+    return chunkPaths;
+  }
+
+  /**
+   * Get a local path from all temporal paths in round-robin manner.
+   */
+  private synchronized Path getChunkPathForWrite(int level, int chunkId) throws IOException {
+    return localDirAllocator.getLocalPathForWrite(sortTmpDir + "/" + level +"_" + chunkId, context.getConf());
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+
+    if (!sorted) { // if not sorted, first sort all data
+
+      // if input files are given, it starts merging directly.
+      if (mergedInputPaths != null) {
+        try {
+          this.result = externalMergeAndSort(mergedInputPaths);
+        } catch (Exception e) {
+          throw new PhysicalPlanningException(e);
+        }
+      } else {
+        // Try to sort all data, and store them as multiple chunks if memory exceeds
+        long startTimeOfChunkSplit = System.currentTimeMillis();
+        List<Path> chunks = sortAndStoreAllChunks();
+        long endTimeOfChunkSplit = System.currentTimeMillis();
+        info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
+
+        if (memoryResident) { // if all sorted data reside in a main-memory table.
+          this.result = new MemTableScanner();
+        } else { // if input data exceeds main-memory at least once
+
+          try {
+            this.result = externalMergeAndSort(chunks);
+          } catch (Exception e) {
+            throw new PhysicalPlanningException(e);
+          }
+
+        }
+      }
+
+      sorted = true;
+      result.init();
+
+      // if loaded and sorted, we assume that it proceeds the half of one entire external sort operation.
+      progress = 0.5f;
+    }
+
+    return result.next();
+  }
+
+  private int calculateFanout(int remainInputChunks, int intputNum, int outputNum, int startIdx) {
+    int computedFanout = Math.min(remainInputChunks, defaultFanout);
+
+    // Why should we detect an opportunity for unbalanced merge?
+    //
+    // Assume that a fanout is given by 8 and there are 10 chunks.
+    // If we firstly merge 3 chunks into one chunk, there remain only 8 chunks.
+    // Then, we can just finish the merge phase even though we don't complete merge phase on all chunks.
+    if (checkIfCanBeUnbalancedMerged(intputNum - (startIdx + computedFanout), outputNum + 1)) {
+      int candidateFanout = computedFanout;
+      while(checkIfCanBeUnbalancedMerged(intputNum - (startIdx + candidateFanout), outputNum + 1)) {
+        candidateFanout--;
+      }
+      int beforeFanout = computedFanout;
+      if (computedFanout > candidateFanout + 1) {
+        computedFanout = candidateFanout + 1;
+        info(LOG, "Fanout reduced for unbalanced merge: " + beforeFanout + " -> " + computedFanout);
+      }
+    }
+
+    return computedFanout;
+  }
+
+  private Scanner externalMergeAndSort(List<Path> chunks)
+      throws IOException, ExecutionException, InterruptedException {
+    int level = 0;
+    final List<Path> inputFiles = TUtil.newList(chunks);
+    final List<Path> outputFiles = TUtil.newList();
+    int remainRun = inputFiles.size();
+    int chunksSize = chunks.size();
+
+    long mergeStart = System.currentTimeMillis();
+
+    // continue until the remain runs are larger than defaultFanout
+    while (remainRun > defaultFanout) {
+
+      // reset outChunkId
+      int remainInputRuns = inputFiles.size();
+      int outChunkId = 0;
+      int outputFileNum = 0;
+      List<Future> futures = TUtil.newList();
+      // the number of files being merged in threads.
+      List<Integer> numberOfMergingFiles = TUtil.newList();
+
+      for (int startIdx = 0; startIdx < inputFiles.size();) {
+
+        // calculate proper fanout
+        int fanout = calculateFanout(remainInputRuns, inputFiles.size(), outputFileNum, startIdx);
+        // how many files are merged in ith thread?
+        numberOfMergingFiles.add(fanout);
+        // launch a merger runner
+        futures.add(executorService.submit(
+            new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false)));
+        outputFileNum++;
+
+        startIdx += fanout;
+        remainInputRuns = inputFiles.size() - startIdx;
+
+        // If unbalanced merge is available, it finishes the merge phase earlier.
+        if (checkIfCanBeUnbalancedMerged(remainInputRuns, outputFileNum)) {
+          info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns
+              + ") and output files (" + outputFileNum + ") <= " + defaultFanout);
+
+          List<Path> switched = TUtil.newList();
+          // switch the remain inputs to the next outputs
+          for (int j = startIdx; j < inputFiles.size(); j++) {
+            switched.add(inputFiles.get(j));
+          }
+          inputFiles.removeAll(switched);
+          outputFiles.addAll(switched);
+
+          break;
+        }
+      }
+
+      // wait for all sort runners
+      int finishedMerger = 0;
+      int index = 0;
+      for (Future<Path> future : futures) {
+        outputFiles.add(future.get());
+        // Getting the number of merged files
+        finishedMerger += numberOfMergingFiles.get(index++);
+        // progress = (# number of merged files / total number of files) * 0.5;
+        progress = ((float)finishedMerger/(float)chunksSize) * 0.5f;
+      }
+
+      // delete merged intermediate files
+      for (Path path : inputFiles) {
+        localFS.delete(path, true);
+      }
+      info(LOG, inputFiles.size() + " merged intermediate files deleted");
+
+      // switch input files to output files, and then clear outputFiles
+      inputFiles.clear();
+      inputFiles.addAll(outputFiles);
+      remainRun = inputFiles.size();
+      outputFiles.clear();
+      level++;
+    }
+
+    long mergeEnd = System.currentTimeMillis();
+    info(LOG, "Total merge time: " + (mergeEnd - mergeStart) + " msec");
+
+    // final result
+    finalOutputFiles = inputFiles;
+
+    result = createFinalMerger(inputFiles);
+    return result;
+  }
+
+  /**
+   * Merge Thread
+   */
+  private class KWayMergerCaller implements Callable<Path> {
+    final int level;
+    final int nextRunId;
+    final List<Path> inputFiles;
+    final int startIdx;
+    final int mergeFanout;
+    final boolean updateInputStats;
+
+    public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
+                            final int startIdx, final int mergeFanout, final boolean updateInputStats) {
+      this.level = level;
+      this.nextRunId = nextRunId;
+      this.inputFiles = inputFiles;
+      this.startIdx = startIdx;
+      this.mergeFanout = mergeFanout;
+      this.updateInputStats = updateInputStats;
+    }
+
+    @Override
+    public Path call() throws Exception {
+      final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
+      info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName());
+      long mergeStartTime = System.currentTimeMillis();
+      final RawFileAppender output = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+      output.init();
+      final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout);
+      merger.init();
+      Tuple mergeTuple;
+      while((mergeTuple = merger.next()) != null) {
+        output.addTuple(mergeTuple);
+      }
+      merger.close();
+      output.close();
+      long mergeEndTime = System.currentTimeMillis();
+      info(LOG, outputPath.getName() + " is written to a disk. ("
+          + FileUtil.humanReadableByteCount(output.getOffset(), false)
+          + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
+      return outputPath;
+    }
+  }
+
+  /**
+   * It checks if unbalanced merge is possible.
+   */
+  private boolean checkIfCanBeUnbalancedMerged(int remainInputNum, int outputNum) {
+    return (remainInputNum + outputNum) <= defaultFanout;
+  }
+
+  /**
+   * Create a merged file scanner or k-way merge scanner.
+   */
+  private Scanner createFinalMerger(List<Path> inputs) throws IOException {
+    if (inputs.size() == 1) {
+      this.result = getFileScanner(inputs.get(0));
+    } else {
+      this.result = createKWayMerger(inputs, 0, inputs.size());
+    }
+    return result;
+  }
+
+  private Scanner getFileScanner(Path path) throws IOException {
+    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
+  }
+
+  private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+    final Scanner [] sources = new Scanner[num];
+    for (int i = 0; i < num; i++) {
+      sources[i] = getFileScanner(inputs.get(startChunkId + i));
+    }
+
+    return createKWayMergerInternal(sources, 0, num);
+  }
+
+  private Scanner createKWayMergerInternal(final Scanner [] sources, final int startIdx, final int num)
+      throws IOException {
+    if (num > 1) {
+      final int mid = (int) Math.ceil((float)num / 2);
+      return new PairWiseMerger(
+          createKWayMergerInternal(sources, startIdx, mid),
+          createKWayMergerInternal(sources, startIdx + mid, num - mid));
+    } else {
+      return sources[startIdx];
+    }
+  }
+
+  private class MemTableScanner implements Scanner {
+    Iterator<Tuple> iterator;
+
+    // for input stats
+    float scannerProgress;
+    int numRecords;
+    int totalRecords;
+    TableStats scannerTableStats;
+
+    @Override
+    public void init() throws IOException {
+      iterator = inMemoryTable.iterator();
+
+      totalRecords = inMemoryTable.size();
+      scannerProgress = 0.0f;
+      numRecords = 0;
+
+      // it will be returned as the final stats
+      scannerTableStats = new TableStats();
+      scannerTableStats.setNumBytes(sortAndStoredBytes);
+      scannerTableStats.setReadBytes(sortAndStoredBytes);
+      scannerTableStats.setNumRows(totalRecords);
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      if (iterator.hasNext()) {
+        numRecords++;
+        return iterator.next();
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      iterator = null;
+      scannerProgress = 1.0f;
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public void setTarget(Column[] targets) {
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return false;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return null;
+    }
+
+    @Override
+    public float getProgress() {
+      if (iterator != null && numRecords > 0) {
+        return (float)numRecords / (float)totalRecords;
+
+      } else { // if an input is empty
+        return scannerProgress;
+      }
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      return scannerTableStats;
+    }
+  }
+
+  /**
+   * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
+   */
+  private class PairWiseMerger implements Scanner {
+    private Scanner leftScan;
+    private Scanner rightScan;
+
+    private Tuple leftTuple;
+    private Tuple rightTuple;
+
+    private final Comparator<Tuple> comparator = getComparator();
+
+    private float mergerProgress;
+    private TableStats mergerInputStats;
+
+    public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws IOException {
+      this.leftScan = leftScanner;
+      this.rightScan = rightScanner;
+    }
+
+    @Override
+    public void init() throws IOException {
+      leftScan.init();
+      rightScan.init();
+
+      leftTuple = leftScan.next();
+      rightTuple = rightScan.next();
+
+      mergerInputStats = new TableStats();
+      mergerProgress = 0.0f;
+    }
+
+    public Tuple next() throws IOException {
+      Tuple outTuple;
+      if (leftTuple != null && rightTuple != null) {
+        if (comparator.compare(leftTuple, rightTuple) < 0) {
+          outTuple = leftTuple;
+          leftTuple = leftScan.next();
+        } else {
+          outTuple = rightTuple;
+          rightTuple = rightScan.next();
+        }
+        return outTuple;
+      }
+
+      if (leftTuple == null) {
+        outTuple = rightTuple;
+        rightTuple = rightScan.next();
+      } else {
+        outTuple = leftTuple;
+        leftTuple = leftScan.next();
+      }
+      return outTuple;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      leftScan.reset();
+      rightScan.reset();
+      init();
+    }
+
+    public void close() throws IOException {
+      IOUtils.cleanup(LOG, leftScan, rightScan);
+      getInputStats();
+      leftScan = null;
+      rightScan = null;
+      mergerProgress = 1.0f;
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public void setTarget(Column[] targets) {
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return false;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return inSchema;
+    }
+
+    @Override
+    public float getProgress() {
+      if (leftScan == null) {
+        return mergerProgress;
+      }
+      return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      if (leftScan == null) {
+        return mergerInputStats;
+      }
+      TableStats leftInputStats = leftScan.getInputStats();
+      mergerInputStats.setNumBytes(0);
+      mergerInputStats.setReadBytes(0);
+      mergerInputStats.setNumRows(0);
+
+      if (leftInputStats != null) {
+        mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
+        mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
+        mergerInputStats.setNumRows(leftInputStats.getNumRows());
+      }
+
+      TableStats rightInputStats = rightScan.getInputStats();
+      if (rightInputStats != null) {
+        mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes());
+        mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes());
+        mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows());
+      }
+
+      return mergerInputStats;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (result != null) {
+      result.close();
+      try {
+        inputStats = (TableStats)result.getInputStats().clone();
+      } catch (CloneNotSupportedException e) {
+        LOG.warn(e.getMessage());
+      }
+      result = null;
+    }
+
+    if (finalOutputFiles != null) {
+      for (Path path : finalOutputFiles) {
+        localFS.delete(path, true);
+      }
+    }
+
+    if(inMemoryTable != null){
+      inMemoryTable.clear();
+      inMemoryTable = null;
+    }
+
+    if(executorService != null){
+      executorService.shutdown();
+      executorService = null;
+    }
+
+    plan = null;
+    super.close();
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    if (result != null) {
+      result.reset();
+      progress = 0.5f;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    if (result != null) {
+      return progress + result.getProgress() * 0.5f;
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (result != null) {
+      return result.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
new file mode 100644
index 0000000..a31ad90
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is the hash-based GroupBy Operator.
+ */
+public class HashAggregateExec extends AggregationExec {
+  private Tuple tuple = null;
+  private Map<Tuple, FunctionContext[]> hashTable;
+  private boolean computed = false;
+  private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
+
+  public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
+    super(ctx, plan, subOp);
+    hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+    this.tuple = new VTuple(plan.getOutSchema().size());
+  }
+
+  private void compute() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+    while((tuple = child.next()) != null && !context.isStopped()) {
+      keyTuple = new VTuple(groupingKeyIds.length);
+      // build one key tuple
+      for(int i = 0; i < groupingKeyIds.length; i++) {
+        keyTuple.put(i, tuple.get(groupingKeyIds[i]));
+      }
+      
+      if(hashTable.containsKey(keyTuple)) {
+        FunctionContext [] contexts = hashTable.get(keyTuple);
+        for(int i = 0; i < aggFunctions.length; i++) {
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+      } else { // if the key occurs firstly
+        FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+        for(int i = 0; i < aggFunctionsNum; i++) {
+          contexts[i] = aggFunctions[i].newContext();
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
+        }
+        hashTable.put(keyTuple, contexts);
+      }
+    }
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if(!computed) {
+      compute();
+      iterator = hashTable.entrySet().iterator();
+      computed = true;
+    }
+
+    FunctionContext [] contexts;
+
+    if (iterator.hasNext()) {
+      Entry<Tuple, FunctionContext []> entry = iterator.next();
+      Tuple keyTuple = entry.getKey();
+      contexts =  entry.getValue();
+
+      int tupleIdx = 0;
+      for (; tupleIdx < groupingKeyNum; tupleIdx++) {
+        tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+      }
+      for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
+        tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));
+      }
+
+      return tuple;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {    
+    iterator = hashTable.entrySet().iterator();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    hashTable.clear();
+    hashTable = null;
+    iterator = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
new file mode 100644
index 0000000..df32d0b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is a physical operator to store at column partitioned table.
+ */
+public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
+  private static Log LOG = LogFactory.getLog(HashBasedColPartitionStoreExec.class);
+
+  private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
+
+  public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+      throws IOException {
+    super(context, plan, child);
+  }
+
+  public void init() throws IOException {
+    super.init();
+  }
+
+  private Appender getAppender(String partition) throws IOException {
+    Appender appender = appenderMap.get(partition);
+
+    if (appender == null) {
+      Path dataFile = getDataFile(partition);
+      FileSystem fs = dataFile.getFileSystem(context.getConf());
+
+      if (fs.exists(dataFile.getParent())) {
+        LOG.info("Path " + dataFile.getParent() + " already exists!");
+      } else {
+        fs.mkdirs(dataFile.getParent());
+        LOG.info("Add subpartition path directory :" + dataFile.getParent());
+      }
+
+      if (fs.exists(dataFile)) {
+        LOG.info("File " + dataFile + " already exists!");
+        FileStatus status = fs.getFileStatus(dataFile);
+        LOG.info("File size: " + status.getLen());
+      }
+
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+      appender.enableStats();
+      appender.init();
+      appenderMap.put(partition, appender);
+    } else {
+      appender = appenderMap.get(partition);
+    }
+    return appender;
+  }
+
+  /* (non-Javadoc)
+   * @see PhysicalExec#next()
+   */
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    StringBuilder sb = new StringBuilder();
+    while((tuple = child.next()) != null) {
+      // set subpartition directory name
+      sb.delete(0, sb.length());
+      if (keyIds != null) {
+        for(int i = 0; i < keyIds.length; i++) {
+          Datum datum = tuple.get(keyIds[i]);
+          if(i > 0)
+            sb.append("/");
+          sb.append(keyNames[i]).append("=");
+          sb.append(datum.asChars());
+        }
+      }
+
+      // add tuple
+      Appender appender = getAppender(sb.toString());
+      appender.addTuple(tuple);
+    }
+
+    List<TableStats> statSet = new ArrayList<TableStats>();
+    for (Map.Entry<String, Appender> entry : appenderMap.entrySet()) {
+      Appender app = entry.getValue();
+      app.flush();
+      app.close();
+      statSet.add(app.getStats());
+    }
+
+    // Collect and aggregated statistics data
+    TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
+    context.setResultStats(aggregated);
+
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
new file mode 100644
index 0000000..65ebe2f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashFullOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private Map<Tuple, Boolean> matched;
+
+  public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                               PhysicalExec inner) {
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+        plan.getOutSchema(), outer, inner);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+    this.matched = new HashMap<Tuple, Boolean>(10000);
+
+    // HashJoin only can manage equi join key pairs.
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), inner.getSchema(),
+        false);
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = outer.getSchema().size();
+    rightNumCols = inner.getSchema().size();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple getNextUnmatchedRight() {
+
+    List<Tuple> newValue;
+    Tuple returnedTuple;
+    // get a keyTUple from the matched hashmap with a boolean false value
+    for(Tuple aKeyTuple : matched.keySet()) {
+      if(matched.get(aKeyTuple) == false) {
+        newValue = tupleSlots.get(aKeyTuple);
+        returnedTuple = newValue.remove(0);
+        tupleSlots.put(aKeyTuple, newValue);
+
+        // after taking the last element from the list in tupleSlots, set flag true in matched as well
+        if(newValue.isEmpty()){
+          matched.put(aKeyTuple, true);
+        }
+
+        return returnedTuple;
+      }
+    }
+    return null;
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+          Tuple unmatchedRightTuple = getNextUnmatchedRight();
+          if( unmatchedRightTuple == null) {
+            finished = true;
+            outTuple = null;
+            return null;
+          } else {
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+            projector.eval(frameTuple, outTuple);
+
+            return outTuple;
+          }
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+        if (rightTuples != null) { // found right tuples on in-memory hash table.
+          iterator = rightTuples.iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(frameTuple, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
+        found = true;
+        getKeyLeftTuple(leftTuple, leftKeyTuple);
+        matched.put(leftKeyTuple, true);
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      List<Tuple> newValue = tupleSlots.get(keyTuple);
+      if (newValue != null) {
+        newValue.add(tuple);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+        matched.put(keyTuple,false);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    tupleSlots.clear();
+    matched.clear();
+    tupleSlots = null;
+    matched = null;
+    iterator = null;
+    plan = null;
+    joinQual = null;
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
new file mode 100644
index 0000000..dea0340
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+public class HashJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+
+  public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
+      PhysicalExec rightExec) {
+    super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(),
+        leftExec, rightExec);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(100000);
+
+    // HashJoin only can manage equi join key pairs.
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
+        rightExec.getSchema(), false);
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  long scanStartTime = 0;
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+      scanStartTime = System.currentTimeMillis();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          finished = true;
+          return null;
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+        if (rightTuples != null) { // found right tuples on in-memory hash table.
+          iterator = rightTuples.iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          shouldGetLeftTuple = true;
+          continue;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
+        found = true;
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+
+    return new VTuple(outTuple);
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      List<Tuple> newValue = tupleSlots.get(keyTuple);
+
+      if (newValue != null) {
+        newValue.add(tuple);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      }
+    }
+
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (tupleSlots != null) {
+      tupleSlots.clear();
+      tupleSlots = null;
+    }
+
+    iterator = null;
+    plan = null;
+    joinQual = null;
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
new file mode 100644
index 0000000..50a1438
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Prepare a hash table of the NOT IN side of the join. Scan the FROM side table.
+ * For each tuple of the FROM side table, it tries to find a matched tuple from the hash table for the NOT INT side.
+ * If not found, it returns the tuple of the FROM side table with null padding.
+ */
+public class HashLeftAntiJoinExec extends HashJoinExec {
+  private Tuple rightNullTuple;
+
+  public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
+                              PhysicalExec notInSideChild) {
+    super(context, plan, fromSideChild, notInSideChild);
+    // NUll Tuple
+    rightNullTuple = new VTuple(leftChild.outColumnNum);
+    for (int i = 0; i < leftChild.outColumnNum; i++) {
+      rightNullTuple.put(i, NullDatum.get());
+    }
+  }
+
+  /**
+   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+   * next() method finds the first unmatched tuple from both tables.
+   *
+   * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
+   * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
+   * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
+   *
+   * @return The tuple which is unmatched to a given join condition.
+   * @throws IOException
+   */
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean notFound;
+
+    while(!finished) {
+
+      // getting new outer
+      leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
+      }
+
+      // Try to find a hash bucket in in-memory hash table
+      getKeyLeftTuple(leftTuple, leftKeyTuple);
+      List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+      if (rightTuples != null) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = rightTuples.iterator();
+      } else {
+        // if not found, it returns a tuple.
+        frameTuple.set(leftTuple, rightNullTuple);
+        projector.eval(frameTuple, outTuple);
+        return outTuple;
+      }
+
+      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+      // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
+      notFound = true;
+      while (notFound && iterator.hasNext()) {
+        rightTuple = iterator.next();
+        frameTuple.set(leftTuple, rightTuple);
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
+          notFound = false;
+        }
+      }
+
+      if (notFound) { // if there is no matched tuple
+        frameTuple.set(leftTuple, rightNullTuple);
+        projector.eval(frameTuple, outTuple);
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
new file mode 100644
index 0000000..849dc38
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected Projector projector;
+
+  private int rightNumCols;
+  private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
+
+  public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                               PhysicalExec rightChild) {
+    super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
+        plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    // HashJoin only can manage equi join key pairs.
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(),
+        rightChild.getSchema(), false);
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.size());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    rightNumCols = rightChild.getSchema().size();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          finished = true;
+          return null;
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+        if (rightTuples != null) { // found right tuples on in-memory hash table.
+          iterator = rightTuples.iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(frameTuple, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
+        found = true;
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      List<Tuple> newValue = tupleSlots.get(keyTuple);
+      if (newValue != null) {
+        newValue.add(tuple);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    tupleSlots.clear();
+    tupleSlots = null;
+    iterator = null;
+    plan = null;
+    joinQual = null;
+    projector = null;
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
new file mode 100644
index 0000000..4fbb5e4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Prepare a hash table of the NOT IN side of the join. Scan the FROM side table.
+ * For each tuple of the FROM side table, it tries to find a matched tuple from the hash table for the NOT INT side.
+ * If found, it returns the tuple of the FROM side table.
+ */
+public class HashLeftSemiJoinExec extends HashJoinExec {
+  private Tuple rightNullTuple;
+
+  public HashLeftSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
+                              PhysicalExec inSideChild) {
+    super(context, plan, fromSideChild, inSideChild);
+    // NUll Tuple
+    rightNullTuple = new VTuple(leftChild.outColumnNum);
+    for (int i = 0; i < leftChild.outColumnNum; i++) {
+      rightNullTuple.put(i, NullDatum.get());
+    }
+  }
+
+  /**
+   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+   * next() method finds the first unmatched tuple from both tables.
+   *
+   * For each left tuple on the disk, next() tries to find at least one matched tuple from the hash table.
+   *
+   * In more detail, until there is a hash bucket matched to the left tuple in the hash table, it continues to traverse
+   * the left tuples. If next() finds the matched bucket in the hash table, it finds any matched tuple in the bucket.
+   * If found, it returns the composite tuple immediately without finding more matched tuple in the bucket.
+   *
+   * @return The tuple which is firstly matched to a given join condition.
+   * @throws java.io.IOException
+   */
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean notFound;
+
+    while(!finished) {
+
+      // getting new outer
+      leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
+      }
+
+      // Try to find a hash bucket in in-memory hash table
+      getKeyLeftTuple(leftTuple, leftKeyTuple);
+      List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+      if (rightTuples != null) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = rightTuples.iterator();
+      } else {
+        continue;
+      }
+
+      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+      // If it finds any matched tuple, it returns the tuple immediately.
+      notFound = true;
+      while (notFound && iterator.hasNext()) {
+        rightTuple = iterator.next();
+        frameTuple.set(leftTuple, rightTuple);
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
+          notFound = false;
+          projector.eval(frameTuple, outTuple);
+        }
+      }
+
+      if (!notFound) { // if there is no matched tuple
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
new file mode 100644
index 0000000..3ae53d9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+public class HashPartitioner extends Partitioner {
+  private final Tuple keyTuple;
+  
+  public HashPartitioner(final int [] keys, final int numPartitions) {
+    super(keys, numPartitions);
+    this.keyTuple = new VTuple(partitionKeyIds.length);
+  }
+  
+  @Override
+  public int getPartition(Tuple tuple) {
+    // In outer join, partition number can be zero because of empty tables.
+    // So, we should return zero for this case.
+    if (numPartitions == 0)
+      return 0;
+
+    // build one key tuple
+    for (int i = 0; i < partitionKeyIds.length; i++) {
+      keyTuple.put(i, tuple.get(partitionKeyIds[i]));
+    }
+    return (keyTuple.hashCode() & Integer.MAX_VALUE) %
+        (numPartitions == 32 ? numPartitions-1 : numPartitions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
new file mode 100644
index 0000000..678b745
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <code>HashShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle keys. The file outputs are stored on local disks.
+ */
+public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
+  private ShuffleFileWriteNode plan;
+  private final TableMeta meta;
+  private Partitioner partitioner;
+  private final Path storeTablePath;
+  private Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+  private final int numShuffleOutputs;
+  private final int [] shuffleKeyIds;
+  
+  public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm,
+                                  final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    Preconditions.checkArgument(plan.hasShuffleKeys());
+    this.plan = plan;
+    if (plan.hasOptions()) {
+      this.meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+    } else {
+      this.meta = CatalogUtil.newTableMeta(plan.getStorageType());
+    }
+    // about the shuffle
+    this.numShuffleOutputs = this.plan.getNumOutputs();
+    int i = 0;
+    this.shuffleKeyIds = new int [this.plan.getShuffleKeys().length];
+    for (Column key : this.plan.getShuffleKeys()) {
+      shuffleKeyIds[i] = inSchema.getColumnId(key.getQualifiedName());
+      i++;
+    }
+    this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
+    storeTablePath = new Path(context.getWorkDir(), "output");
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+    FileSystem fs = new RawLocalFileSystem();
+    fs.mkdirs(storeTablePath);
+  }
+  
+  private Appender getAppender(int partId) throws IOException {
+    Appender appender = appenderMap.get(partId);
+
+    if (appender == null) {
+      Path dataFile = getDataFile(partId);
+      FileSystem fs = dataFile.getFileSystem(context.getConf());
+      if (fs.exists(dataFile)) {
+        LOG.info("File " + dataFile + " already exists!");
+        FileStatus status = fs.getFileStatus(dataFile);
+        LOG.info("File size: " + status.getLen());
+      }
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+      appender.enableStats();
+      appender.init();
+      appenderMap.put(partId, appender);
+    } else {
+      appender = appenderMap.get(partId);
+    }
+
+    return appender;
+  }
+
+  private Path getDataFile(int partId) {
+    return StorageUtil.concatPath(storeTablePath, ""+partId);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    Appender appender;
+    int partId;
+    while ((tuple = child.next()) != null) {
+      partId = partitioner.getPartition(tuple);
+      appender = getAppender(partId);
+      appender.addTuple(tuple);
+    }
+    
+    List<TableStats> statSet = new ArrayList<TableStats>();
+    for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
+      int partNum = entry.getKey();
+      Appender app = entry.getValue();
+      app.flush();
+      app.close();
+      statSet.add(app.getStats());
+      if (app.getStats().getNumRows() > 0) {
+        context.addShuffleFileOutput(partNum, getDataFile(partNum).getName());
+      }
+    }
+    
+    // Collect and aggregated statistics data
+    TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
+    context.setResultStats(aggregated);
+    
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do   
+  }
+
+  @Override
+  public void close() throws IOException{
+    super.close();
+    if (appenderMap != null) {
+      appenderMap.clear();
+      appenderMap = null;
+    }
+
+    partitioner = null;
+    plan = null;
+
+    progress = 1.0f;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
new file mode 100644
index 0000000..0418f65
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.HavingNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class HavingExec extends UnaryPhysicalExec  {
+  private final EvalNode qual;
+
+  public HavingExec(TaskAttemptContext context,
+                    HavingNode plan,
+                    PhysicalExec child) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+
+    this.qual = plan.getQual();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    while ((tuple = child.next()) != null) {
+      if (qual.eval(inSchema, tuple).isTrue()) {
+        return tuple;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
new file mode 100644
index 0000000..0d4c47b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Comparator;
+
+/**
+ * The Comparator class for Outer and Inner Tuples
+ *
+ * @see org.apache.tajo.storage.Tuple
+ */
+public class JoinTupleComparator implements Comparator<Tuple> {
+  private int numSortKey;
+  private final int[] outerSortKeyIds;
+  private final int[] innerSortKeyIds;
+
+  private Datum outer;
+  private Datum inner;
+  private int compVal;
+
+  public JoinTupleComparator(Schema leftschema, Schema rightschema, SortSpec[][] sortKeys) {
+    Preconditions.checkArgument(sortKeys.length == 2,
+        "The two of the sortspecs must be given, but " + sortKeys.length + " sortkeys are given.");
+    Preconditions.checkArgument(sortKeys[0].length == sortKeys[1].length,
+        "The number of both side sortkeys must be equals, but they are different: "
+            + sortKeys[0].length + " and " + sortKeys[1].length);
+
+    this.numSortKey = sortKeys[0].length; // because it is guaranteed that the number of sortspecs are equals
+    this.outerSortKeyIds = new int[numSortKey];
+    this.innerSortKeyIds = new int[numSortKey];
+
+    for (int i = 0; i < numSortKey; i++) {
+      this.outerSortKeyIds[i] = leftschema.getColumnId(sortKeys[0][i].getSortKey().getQualifiedName());
+      this.innerSortKeyIds[i] = rightschema.getColumnId(sortKeys[1][i].getSortKey().getQualifiedName());
+    }
+  }
+
+  @Override
+  public int compare(Tuple outerTuple, Tuple innerTuple) {
+    for (int i = 0; i < numSortKey; i++) {
+      outer = outerTuple.get(outerSortKeyIds[i]);
+      inner = innerTuple.get(innerSortKeyIds[i]);
+
+      if (outer instanceof NullDatum || inner instanceof NullDatum) {
+        if (!outer.equals(inner)) {
+          if (outer instanceof NullDatum) {
+            compVal = 1;
+          } else if (inner instanceof NullDatum) {
+            compVal = -1;
+          }
+        } else {
+          compVal = 0;
+        }
+      } else {
+        compVal = outer.compareTo(inner);
+      }
+
+      if (compVal < 0 || compVal > 0) {
+        return compVal;
+      }
+    }
+    return 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
new file mode 100644
index 0000000..d736c25
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.logical.LimitNode;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class LimitExec extends UnaryPhysicalExec {
+  private final long fetchFirstNum;
+  private long fetchCount;
+
+  public LimitExec(TaskAttemptContext context, Schema inSchema,
+                   Schema outSchema, PhysicalExec child, LimitNode limit) {
+    super(context, inSchema, outSchema, child);
+    this.fetchFirstNum = limit.getFetchFirstNum();
+    this.fetchCount = 0;
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple = child.next();
+    fetchCount++;
+
+    if (fetchCount > fetchFirstNum || tuple == null) {
+      return null;
+    }
+
+    return tuple;
+  }
+
+  public void rescan() throws IOException {
+    super.init();
+    fetchCount = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
new file mode 100644
index 0000000..9f4f20a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class MemSortExec extends SortExec {
+  private SortNode plan;
+  private List<Tuple> tupleSlots;
+  private boolean sorted = false;
+  private Iterator<Tuple> iterator;
+  
+  public MemSortExec(final TaskAttemptContext context,
+                     SortNode plan, PhysicalExec child) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
+    this.plan = plan;
+  }
+
+  public void init() throws IOException {
+    super.init();
+    this.tupleSlots = new ArrayList<Tuple>(1000);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+
+    if (!sorted) {
+      Tuple tuple;
+      while ((tuple = child.next()) != null) {
+        tupleSlots.add(new VTuple(tuple));
+      }
+      
+      Collections.sort(tupleSlots, getComparator());
+      this.iterator = tupleSlots.iterator();
+      sorted = true;
+    }
+    
+    if (iterator.hasNext()) {
+      return this.iterator.next();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    this.iterator = tupleSlots.iterator();
+    sorted = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    tupleSlots.clear();
+    tupleSlots = null;
+    iterator = null;
+    plan = null;
+  }
+
+  public SortNode getPlan() {
+    return this.plan;
+  }
+}