You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:35 UTC
[32/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core
into the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
new file mode 100644
index 0000000..c422b49
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.storage.RawFile.RawFileAppender;
+import static org.apache.tajo.storage.RawFile.RawFileScanner;
+
+/**
+ * This external sort algorithm can be characterized by the followings:
+ *
+ * <ul>
+ * <li>in-memory sort if input data size fits a sort buffer</li>
+ * <li>k-way merge sort if input data size exceeds the size of sort buffer</li>
+ * <li>parallel merge</li>
+ * <li>final merge avoidance</li>
+ * <li>Unbalance merge if needed</li>
+ * </ul>
+ */
+public class ExternalSortExec extends SortExec {
+ /** Class logger */
+ private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
+
+ private SortNode plan;
+ private final TableMeta meta;
+ /** the defaultFanout of external sort */
+ private final int defaultFanout;
+ /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
+ private long sortBufferBytesNum;
+ /** the number of available cores */
+ private final int allocatedCoreNum;
+ /** If there are available multiple cores, it tries parallel merge. */
+ private ExecutorService executorService;
+ /** used for in-memory sort of each chunk. */
+ private List<Tuple> inMemoryTable;
+ /** temporal dir */
+ private final Path sortTmpDir;
+ /** It enables round-robin disks allocation */
+ private final LocalDirAllocator localDirAllocator;
+ /** local file system */
+ private final RawLocalFileSystem localFS;
+ /** final output files which are used for cleaning */
+ private List<Path> finalOutputFiles = null;
+ /** for directly merging sorted inputs */
+ private List<Path> mergedInputPaths = null;
+
+ ///////////////////////////////////////////////////
+ // transient variables
+ ///////////////////////////////////////////////////
+ /** already sorted or not */
+ private boolean sorted = false;
+ /** a flag to point whether sorted data resides in memory or not */
+ private boolean memoryResident = true;
+ /** the final result */
+ private Scanner result;
+ /** total bytes of input data */
+ private long sortAndStoredBytes;
+
+ private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
+ throws PhysicalPlanningException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
+
+ this.plan = plan;
+ this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
+
+ this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
+ if (defaultFanout < 2) {
+ throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2");
+ }
+ // TODO - sort buffer and core num should be changed to use the allocated container resource.
+ this.sortBufferBytesNum = context.getConf().getLongVar(ConfVars.EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE) * 1048576L;
+ this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
+ this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
+ this.inMemoryTable = new ArrayList<Tuple>(100000);
+
+ this.sortTmpDir = getExecutorTmpDir();
+ localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ localFS = new RawLocalFileSystem();
+ }
+
+ public ExternalSortExec(final TaskAttemptContext context,
+ final AbstractStorageManager sm, final SortNode plan,
+ final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
+ this(context, sm, plan);
+
+ mergedInputPaths = TUtil.newList();
+ for (CatalogProtos.FragmentProto proto : fragments) {
+ FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ mergedInputPaths.add(fragment.getPath());
+ }
+ }
+
+ public ExternalSortExec(final TaskAttemptContext context,
+ final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+ throws IOException {
+ this(context, sm, plan);
+ setChild(child);
+ }
+
+ @VisibleForTesting
+ public void setSortBufferBytesNum(int sortBufferBytesNum) {
+ this.sortBufferBytesNum = sortBufferBytesNum;
+ }
+
+ public void init() throws IOException {
+ inputStats = new TableStats();
+ super.init();
+ }
+
+ public SortNode getPlan() {
+ return this.plan;
+ }
+
+ /**
+ * Sort a tuple block and store them into a chunk file
+ */
+ private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+ throws IOException {
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
+ int rowNum = tupleBlock.size();
+
+ long sortStart = System.currentTimeMillis();
+ Collections.sort(tupleBlock, getComparator());
+ long sortEnd = System.currentTimeMillis();
+
+ long chunkWriteStart = System.currentTimeMillis();
+ Path outputPath = getChunkPathForWrite(0, chunkId);
+ final RawFileAppender appender = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+ appender.init();
+ for (Tuple t : tupleBlock) {
+ appender.addTuple(t);
+ }
+ appender.close();
+ tupleBlock.clear();
+ long chunkWriteEnd = System.currentTimeMillis();
+
+
+ info(LOG, "Chunk #" + chunkId + " sort and written (" +
+ FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " +
+ ", sort time: " + (sortEnd - sortStart) + " msec, " +
+ "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)");
+ return outputPath;
+ }
+
+ /**
+ * It divides all tuples into a number of chunks, then sort for each chunk.
+ *
+ * @return All paths of chunks
+ * @throws java.io.IOException
+ */
+ private List<Path> sortAndStoreAllChunks() throws IOException {
+ Tuple tuple;
+ long memoryConsumption = 0;
+ List<Path> chunkPaths = TUtil.newList();
+
+ int chunkId = 0;
+ long runStartTime = System.currentTimeMillis();
+ while ((tuple = child.next()) != null) { // partition sort start
+ Tuple vtuple = new VTuple(tuple);
+ inMemoryTable.add(vtuple);
+ memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);
+
+ if (memoryConsumption > sortBufferBytesNum) {
+ long runEndTime = System.currentTimeMillis();
+ info(LOG, chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec");
+ runStartTime = runEndTime;
+
+ info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " bytes");
+ memoryResident = false;
+
+ chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+
+ memoryConsumption = 0;
+ chunkId++;
+
+ // When the volume of sorting data once exceed the size of sort buffer,
+ // the total progress of this external sort is divided into two parts.
+ // In contrast, if the data fits in memory, the progress is only one part.
+ //
+ // When the progress is divided into two parts, the first part sorts tuples on memory and stores them
+ // into a chunk. The second part merges stored chunks into fewer chunks, and it continues until the number
+ // of merged chunks is fewer than the default fanout.
+ //
+ // The fact that the code reach here means that the first chunk has been just stored.
+ // That is, the progress was divided into two parts.
+ // So, it multiply the progress of the children operator and 0.5f.
+ progress = child.getProgress() * 0.5f;
+ }
+ }
+
+ if (inMemoryTable.size() > 0) { // if there are at least one or more input tuples
+ if (!memoryResident) { // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
+ if (inMemoryTable.size() > 0) {
+ long start = System.currentTimeMillis();
+ int rowNum = inMemoryTable.size();
+ chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+ long end = System.currentTimeMillis();
+ info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
+ }
+ } else { // this case means that all data does not exceed a sort buffer
+ Collections.sort(inMemoryTable, getComparator());
+ }
+ }
+
+ // get total loaded (or stored) bytes and total row numbers
+ TableStats childTableStats = child.getInputStats();
+ if (childTableStats != null) {
+ sortAndStoredBytes = childTableStats.getNumBytes();
+ }
+ return chunkPaths;
+ }
+
+ /**
+ * Get a local path from all temporal paths in round-robin manner.
+ */
+ private synchronized Path getChunkPathForWrite(int level, int chunkId) throws IOException {
+ return localDirAllocator.getLocalPathForWrite(sortTmpDir + "/" + level +"_" + chunkId, context.getConf());
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+
+ if (!sorted) { // if not sorted, first sort all data
+
+ // if input files are given, it starts merging directly.
+ if (mergedInputPaths != null) {
+ try {
+ this.result = externalMergeAndSort(mergedInputPaths);
+ } catch (Exception e) {
+ throw new PhysicalPlanningException(e);
+ }
+ } else {
+ // Try to sort all data, and store them as multiple chunks if memory exceeds
+ long startTimeOfChunkSplit = System.currentTimeMillis();
+ List<Path> chunks = sortAndStoreAllChunks();
+ long endTimeOfChunkSplit = System.currentTimeMillis();
+ info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
+
+ if (memoryResident) { // if all sorted data reside in a main-memory table.
+ this.result = new MemTableScanner();
+ } else { // if input data exceeds main-memory at least once
+
+ try {
+ this.result = externalMergeAndSort(chunks);
+ } catch (Exception e) {
+ throw new PhysicalPlanningException(e);
+ }
+
+ }
+ }
+
+ sorted = true;
+ result.init();
+
+ // if loaded and sorted, we assume that it proceeds the half of one entire external sort operation.
+ progress = 0.5f;
+ }
+
+ return result.next();
+ }
+
+ private int calculateFanout(int remainInputChunks, int intputNum, int outputNum, int startIdx) {
+ int computedFanout = Math.min(remainInputChunks, defaultFanout);
+
+ // Why should we detect an opportunity for unbalanced merge?
+ //
+ // Assume that a fanout is given by 8 and there are 10 chunks.
+ // If we firstly merge 3 chunks into one chunk, there remain only 8 chunks.
+ // Then, we can just finish the merge phase even though we don't complete merge phase on all chunks.
+ if (checkIfCanBeUnbalancedMerged(intputNum - (startIdx + computedFanout), outputNum + 1)) {
+ int candidateFanout = computedFanout;
+ while(checkIfCanBeUnbalancedMerged(intputNum - (startIdx + candidateFanout), outputNum + 1)) {
+ candidateFanout--;
+ }
+ int beforeFanout = computedFanout;
+ if (computedFanout > candidateFanout + 1) {
+ computedFanout = candidateFanout + 1;
+ info(LOG, "Fanout reduced for unbalanced merge: " + beforeFanout + " -> " + computedFanout);
+ }
+ }
+
+ return computedFanout;
+ }
+
+ private Scanner externalMergeAndSort(List<Path> chunks)
+ throws IOException, ExecutionException, InterruptedException {
+ int level = 0;
+ final List<Path> inputFiles = TUtil.newList(chunks);
+ final List<Path> outputFiles = TUtil.newList();
+ int remainRun = inputFiles.size();
+ int chunksSize = chunks.size();
+
+ long mergeStart = System.currentTimeMillis();
+
+ // continue until the remain runs are larger than defaultFanout
+ while (remainRun > defaultFanout) {
+
+ // reset outChunkId
+ int remainInputRuns = inputFiles.size();
+ int outChunkId = 0;
+ int outputFileNum = 0;
+ List<Future> futures = TUtil.newList();
+ // the number of files being merged in threads.
+ List<Integer> numberOfMergingFiles = TUtil.newList();
+
+ for (int startIdx = 0; startIdx < inputFiles.size();) {
+
+ // calculate proper fanout
+ int fanout = calculateFanout(remainInputRuns, inputFiles.size(), outputFileNum, startIdx);
+ // how many files are merged in ith thread?
+ numberOfMergingFiles.add(fanout);
+ // launch a merger runner
+ futures.add(executorService.submit(
+ new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false)));
+ outputFileNum++;
+
+ startIdx += fanout;
+ remainInputRuns = inputFiles.size() - startIdx;
+
+ // If unbalanced merge is available, it finishes the merge phase earlier.
+ if (checkIfCanBeUnbalancedMerged(remainInputRuns, outputFileNum)) {
+ info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns
+ + ") and output files (" + outputFileNum + ") <= " + defaultFanout);
+
+ List<Path> switched = TUtil.newList();
+ // switch the remain inputs to the next outputs
+ for (int j = startIdx; j < inputFiles.size(); j++) {
+ switched.add(inputFiles.get(j));
+ }
+ inputFiles.removeAll(switched);
+ outputFiles.addAll(switched);
+
+ break;
+ }
+ }
+
+ // wait for all sort runners
+ int finishedMerger = 0;
+ int index = 0;
+ for (Future<Path> future : futures) {
+ outputFiles.add(future.get());
+ // Getting the number of merged files
+ finishedMerger += numberOfMergingFiles.get(index++);
+ // progress = (# number of merged files / total number of files) * 0.5;
+ progress = ((float)finishedMerger/(float)chunksSize) * 0.5f;
+ }
+
+ // delete merged intermediate files
+ for (Path path : inputFiles) {
+ localFS.delete(path, true);
+ }
+ info(LOG, inputFiles.size() + " merged intermediate files deleted");
+
+ // switch input files to output files, and then clear outputFiles
+ inputFiles.clear();
+ inputFiles.addAll(outputFiles);
+ remainRun = inputFiles.size();
+ outputFiles.clear();
+ level++;
+ }
+
+ long mergeEnd = System.currentTimeMillis();
+ info(LOG, "Total merge time: " + (mergeEnd - mergeStart) + " msec");
+
+ // final result
+ finalOutputFiles = inputFiles;
+
+ result = createFinalMerger(inputFiles);
+ return result;
+ }
+
+ /**
+ * Merge Thread
+ */
+ private class KWayMergerCaller implements Callable<Path> {
+ final int level;
+ final int nextRunId;
+ final List<Path> inputFiles;
+ final int startIdx;
+ final int mergeFanout;
+ final boolean updateInputStats;
+
+ public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
+ final int startIdx, final int mergeFanout, final boolean updateInputStats) {
+ this.level = level;
+ this.nextRunId = nextRunId;
+ this.inputFiles = inputFiles;
+ this.startIdx = startIdx;
+ this.mergeFanout = mergeFanout;
+ this.updateInputStats = updateInputStats;
+ }
+
+ @Override
+ public Path call() throws Exception {
+ final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
+ info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName());
+ long mergeStartTime = System.currentTimeMillis();
+ final RawFileAppender output = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+ output.init();
+ final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout);
+ merger.init();
+ Tuple mergeTuple;
+ while((mergeTuple = merger.next()) != null) {
+ output.addTuple(mergeTuple);
+ }
+ merger.close();
+ output.close();
+ long mergeEndTime = System.currentTimeMillis();
+ info(LOG, outputPath.getName() + " is written to a disk. ("
+ + FileUtil.humanReadableByteCount(output.getOffset(), false)
+ + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
+ return outputPath;
+ }
+ }
+
+ /**
+ * It checks if unbalanced merge is possible.
+ */
+ private boolean checkIfCanBeUnbalancedMerged(int remainInputNum, int outputNum) {
+ return (remainInputNum + outputNum) <= defaultFanout;
+ }
+
+ /**
+ * Create a merged file scanner or k-way merge scanner.
+ */
+ private Scanner createFinalMerger(List<Path> inputs) throws IOException {
+ if (inputs.size() == 1) {
+ this.result = getFileScanner(inputs.get(0));
+ } else {
+ this.result = createKWayMerger(inputs, 0, inputs.size());
+ }
+ return result;
+ }
+
+ private Scanner getFileScanner(Path path) throws IOException {
+ return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
+ }
+
+ private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+ final Scanner [] sources = new Scanner[num];
+ for (int i = 0; i < num; i++) {
+ sources[i] = getFileScanner(inputs.get(startChunkId + i));
+ }
+
+ return createKWayMergerInternal(sources, 0, num);
+ }
+
+ private Scanner createKWayMergerInternal(final Scanner [] sources, final int startIdx, final int num)
+ throws IOException {
+ if (num > 1) {
+ final int mid = (int) Math.ceil((float)num / 2);
+ return new PairWiseMerger(
+ createKWayMergerInternal(sources, startIdx, mid),
+ createKWayMergerInternal(sources, startIdx + mid, num - mid));
+ } else {
+ return sources[startIdx];
+ }
+ }
+
+ private class MemTableScanner implements Scanner {
+ Iterator<Tuple> iterator;
+
+ // for input stats
+ float scannerProgress;
+ int numRecords;
+ int totalRecords;
+ TableStats scannerTableStats;
+
+ @Override
+ public void init() throws IOException {
+ iterator = inMemoryTable.iterator();
+
+ totalRecords = inMemoryTable.size();
+ scannerProgress = 0.0f;
+ numRecords = 0;
+
+ // it will be returned as the final stats
+ scannerTableStats = new TableStats();
+ scannerTableStats.setNumBytes(sortAndStoredBytes);
+ scannerTableStats.setReadBytes(sortAndStoredBytes);
+ scannerTableStats.setNumRows(totalRecords);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (iterator.hasNext()) {
+ numRecords++;
+ return iterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ iterator = null;
+ scannerProgress = 1.0f;
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ if (iterator != null && numRecords > 0) {
+ return (float)numRecords / (float)totalRecords;
+
+ } else { // if an input is empty
+ return scannerProgress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return scannerTableStats;
+ }
+ }
+
+ /**
+ * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
+ */
+ private class PairWiseMerger implements Scanner {
+ private Scanner leftScan;
+ private Scanner rightScan;
+
+ private Tuple leftTuple;
+ private Tuple rightTuple;
+
+ private final Comparator<Tuple> comparator = getComparator();
+
+ private float mergerProgress;
+ private TableStats mergerInputStats;
+
+ public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws IOException {
+ this.leftScan = leftScanner;
+ this.rightScan = rightScanner;
+ }
+
+ @Override
+ public void init() throws IOException {
+ leftScan.init();
+ rightScan.init();
+
+ leftTuple = leftScan.next();
+ rightTuple = rightScan.next();
+
+ mergerInputStats = new TableStats();
+ mergerProgress = 0.0f;
+ }
+
+ public Tuple next() throws IOException {
+ Tuple outTuple;
+ if (leftTuple != null && rightTuple != null) {
+ if (comparator.compare(leftTuple, rightTuple) < 0) {
+ outTuple = leftTuple;
+ leftTuple = leftScan.next();
+ } else {
+ outTuple = rightTuple;
+ rightTuple = rightScan.next();
+ }
+ return outTuple;
+ }
+
+ if (leftTuple == null) {
+ outTuple = rightTuple;
+ rightTuple = rightScan.next();
+ } else {
+ outTuple = leftTuple;
+ leftTuple = leftScan.next();
+ }
+ return outTuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ leftScan.reset();
+ rightScan.reset();
+ init();
+ }
+
+ public void close() throws IOException {
+ IOUtils.cleanup(LOG, leftScan, rightScan);
+ getInputStats();
+ leftScan = null;
+ rightScan = null;
+ mergerProgress = 1.0f;
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return inSchema;
+ }
+
+ @Override
+ public float getProgress() {
+ if (leftScan == null) {
+ return mergerProgress;
+ }
+ return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftScan == null) {
+ return mergerInputStats;
+ }
+ TableStats leftInputStats = leftScan.getInputStats();
+ mergerInputStats.setNumBytes(0);
+ mergerInputStats.setReadBytes(0);
+ mergerInputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
+ mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
+ mergerInputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = rightScan.getInputStats();
+ if (rightInputStats != null) {
+ mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes());
+ mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes());
+ mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return mergerInputStats;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (result != null) {
+ result.close();
+ try {
+ inputStats = (TableStats)result.getInputStats().clone();
+ } catch (CloneNotSupportedException e) {
+ LOG.warn(e.getMessage());
+ }
+ result = null;
+ }
+
+ if (finalOutputFiles != null) {
+ for (Path path : finalOutputFiles) {
+ localFS.delete(path, true);
+ }
+ }
+
+ if(inMemoryTable != null){
+ inMemoryTable.clear();
+ inMemoryTable = null;
+ }
+
+ if(executorService != null){
+ executorService.shutdown();
+ executorService = null;
+ }
+
+ plan = null;
+ super.close();
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ if (result != null) {
+ result.reset();
+ progress = 0.5f;
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ if (result != null) {
+ return progress + result.getProgress() * 0.5f;
+ } else {
+ return progress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (result != null) {
+ return result.getInputStats();
+ } else {
+ return inputStats;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
new file mode 100644
index 0000000..a31ad90
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is the hash-based GroupBy Operator.
+ */
+public class HashAggregateExec extends AggregationExec {
+ private Tuple tuple = null;
+ private Map<Tuple, FunctionContext[]> hashTable;
+ private boolean computed = false;
+ private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
+
+ public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
+ super(ctx, plan, subOp);
+ hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+ this.tuple = new VTuple(plan.getOutSchema().size());
+ }
+
+ private void compute() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+ while((tuple = child.next()) != null && !context.isStopped()) {
+ keyTuple = new VTuple(groupingKeyIds.length);
+ // build one key tuple
+ for(int i = 0; i < groupingKeyIds.length; i++) {
+ keyTuple.put(i, tuple.get(groupingKeyIds[i]));
+ }
+
+ if(hashTable.containsKey(keyTuple)) {
+ FunctionContext [] contexts = hashTable.get(keyTuple);
+ for(int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ } else { // if the key occurs firstly
+ FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+ for(int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ hashTable.put(keyTuple, contexts);
+ }
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if(!computed) {
+ compute();
+ iterator = hashTable.entrySet().iterator();
+ computed = true;
+ }
+
+ FunctionContext [] contexts;
+
+ if (iterator.hasNext()) {
+ Entry<Tuple, FunctionContext []> entry = iterator.next();
+ Tuple keyTuple = entry.getKey();
+ contexts = entry.getValue();
+
+ int tupleIdx = 0;
+ for (; tupleIdx < groupingKeyNum; tupleIdx++) {
+ tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+ }
+ for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
+ tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));
+ }
+
+ return tuple;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ iterator = hashTable.entrySet().iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ hashTable.clear();
+ hashTable = null;
+ iterator = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
new file mode 100644
index 0000000..df32d0b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is a physical operator to store at column partitioned table.
+ */
+public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
+ private static Log LOG = LogFactory.getLog(HashBasedColPartitionStoreExec.class);
+
+ private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
+
+ public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+ throws IOException {
+ super(context, plan, child);
+ }
+
+ public void init() throws IOException {
+ super.init();
+ }
+
+ private Appender getAppender(String partition) throws IOException {
+ Appender appender = appenderMap.get(partition);
+
+ if (appender == null) {
+ Path dataFile = getDataFile(partition);
+ FileSystem fs = dataFile.getFileSystem(context.getConf());
+
+ if (fs.exists(dataFile.getParent())) {
+ LOG.info("Path " + dataFile.getParent() + " already exists!");
+ } else {
+ fs.mkdirs(dataFile.getParent());
+ LOG.info("Add subpartition path directory :" + dataFile.getParent());
+ }
+
+ if (fs.exists(dataFile)) {
+ LOG.info("File " + dataFile + " already exists!");
+ FileStatus status = fs.getFileStatus(dataFile);
+ LOG.info("File size: " + status.getLen());
+ }
+
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+ appender.enableStats();
+ appender.init();
+ appenderMap.put(partition, appender);
+ } else {
+ appender = appenderMap.get(partition);
+ }
+ return appender;
+ }
+
+ /* (non-Javadoc)
+ * @see PhysicalExec#next()
+ */
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ StringBuilder sb = new StringBuilder();
+ while((tuple = child.next()) != null) {
+ // set subpartition directory name
+ sb.delete(0, sb.length());
+ if (keyIds != null) {
+ for(int i = 0; i < keyIds.length; i++) {
+ Datum datum = tuple.get(keyIds[i]);
+ if(i > 0)
+ sb.append("/");
+ sb.append(keyNames[i]).append("=");
+ sb.append(datum.asChars());
+ }
+ }
+
+ // add tuple
+ Appender appender = getAppender(sb.toString());
+ appender.addTuple(tuple);
+ }
+
+ List<TableStats> statSet = new ArrayList<TableStats>();
+ for (Map.Entry<String, Appender> entry : appenderMap.entrySet()) {
+ Appender app = entry.getValue();
+ app.flush();
+ app.close();
+ statSet.add(app.getStats());
+ }
+
+ // Collect and aggregated statistics data
+ TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
+ context.setResultStats(aggregated);
+
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ // nothing to do
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
new file mode 100644
index 0000000..65ebe2f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashFullOuterJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ protected JoinNode plan;
+ protected EvalNode joinQual;
+
+ protected List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
+
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
+
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
+
+ // projection
+ protected final Projector projector;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private Map<Tuple, Boolean> matched;
+
+ public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner) {
+ super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+ plan.getOutSchema(), outer, inner);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+ // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+ // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+ this.matched = new HashMap<Tuple, Boolean>(10000);
+
+ // HashJoin only can manage equi join key pairs.
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(), inner.getSchema(),
+ false);
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+ leftKeyTuple = new VTuple(leftKeyList.length);
+
+ leftNumCols = outer.getSchema().size();
+ rightNumCols = inner.getSchema().size();
+ }
+
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ }
+
+ public Tuple getNextUnmatchedRight() {
+
+ List<Tuple> newValue;
+ Tuple returnedTuple;
+ // get a keyTUple from the matched hashmap with a boolean false value
+ for(Tuple aKeyTuple : matched.keySet()) {
+ if(matched.get(aKeyTuple) == false) {
+ newValue = tupleSlots.get(aKeyTuple);
+ returnedTuple = newValue.remove(0);
+ tupleSlots.put(aKeyTuple, newValue);
+
+ // after taking the last element from the list in tupleSlots, set flag true in matched as well
+ if(newValue.isEmpty()){
+ matched.put(aKeyTuple, true);
+ }
+
+ return returnedTuple;
+ }
+ }
+ return null;
+ }
+
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean found = false;
+
+ while(!finished) {
+ if (shouldGetLeftTuple) { // initially, it is true.
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+ Tuple unmatchedRightTuple = getNextUnmatchedRight();
+ if( unmatchedRightTuple == null) {
+ finished = true;
+ outTuple = null;
+ return null;
+ } else {
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+ projector.eval(frameTuple, outTuple);
+
+ return outTuple;
+ }
+ }
+
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+ if (rightTuples != null) { // found right tuples on in-memory hash table.
+ iterator = rightTuples.iterator();
+ shouldGetLeftTuple = false;
+ } else {
+ //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+ //output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(frameTuple, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ shouldGetLeftTuple = true;
+ return outTuple;
+ }
+ }
+
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
+ found = true;
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ matched.put(leftKeyTuple, true);
+ }
+
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
+ }
+
+ if (found) {
+ break;
+ }
+ }
+ return outTuple;
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+
+ while ((tuple = rightChild.next()) != null) {
+ keyTuple = new VTuple(joinKeyPairs.size());
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ List<Tuple> newValue = tupleSlots.get(keyTuple);
+ if (newValue != null) {
+ newValue.add(tuple);
+ } else {
+ newValue = new ArrayList<Tuple>();
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ matched.put(keyTuple,false);
+ }
+ }
+ first = false;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ tupleSlots.clear();
+ first = true;
+
+ finished = false;
+ iterator = null;
+ shouldGetLeftTuple = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ tupleSlots.clear();
+ matched.clear();
+ tupleSlots = null;
+ matched = null;
+ iterator = null;
+ plan = null;
+ joinQual = null;
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
new file mode 100644
index 0000000..dea0340
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+public class HashJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ protected JoinNode plan;
+ protected EvalNode joinQual;
+
+ protected List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
+
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
+
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
+
+ // projection
+ protected final Projector projector;
+
+ public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
+ PhysicalExec rightExec) {
+ super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(),
+ leftExec, rightExec);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ this.tupleSlots = new HashMap<Tuple, List<Tuple>>(100000);
+
+ // HashJoin only can manage equi join key pairs.
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(),
+ rightExec.getSchema(), false);
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = leftExec.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = rightExec.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+ leftKeyTuple = new VTuple(leftKeyList.length);
+ }
+
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ }
+
+ long scanStartTime = 0;
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ scanStartTime = System.currentTimeMillis();
+ }
+
+ Tuple rightTuple;
+ boolean found = false;
+
+ while(!finished) {
+ if (shouldGetLeftTuple) { // initially, it is true.
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+ if (rightTuples != null) { // found right tuples on in-memory hash table.
+ iterator = rightTuples.iterator();
+ shouldGetLeftTuple = false;
+ } else {
+ shouldGetLeftTuple = true;
+ continue;
+ }
+ }
+
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
+ found = true;
+ }
+
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
+ }
+
+ if (found) {
+ break;
+ }
+ }
+
+ return new VTuple(outTuple);
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+
+ while ((tuple = rightChild.next()) != null) {
+ keyTuple = new VTuple(joinKeyPairs.size());
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ List<Tuple> newValue = tupleSlots.get(keyTuple);
+
+ if (newValue != null) {
+ newValue.add(tuple);
+ } else {
+ newValue = new ArrayList<Tuple>();
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ }
+ }
+
+ first = false;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ tupleSlots.clear();
+ first = true;
+
+ finished = false;
+ iterator = null;
+ shouldGetLeftTuple = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (tupleSlots != null) {
+ tupleSlots.clear();
+ tupleSlots = null;
+ }
+
+ iterator = null;
+ plan = null;
+ joinQual = null;
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
new file mode 100644
index 0000000..50a1438
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Prepare a hash table of the NOT IN side of the join. Scan the FROM side table.
+ * For each tuple of the FROM side table, it tries to find a matched tuple from the hash table for the NOT INT side.
+ * If not found, it returns the tuple of the FROM side table with null padding.
+ */
+public class HashLeftAntiJoinExec extends HashJoinExec {
+ private Tuple rightNullTuple;
+
+ public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
+ PhysicalExec notInSideChild) {
+ super(context, plan, fromSideChild, notInSideChild);
+ // NUll Tuple
+ rightNullTuple = new VTuple(leftChild.outColumnNum);
+ for (int i = 0; i < leftChild.outColumnNum; i++) {
+ rightNullTuple.put(i, NullDatum.get());
+ }
+ }
+
+ /**
+ * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+ * next() method finds the first unmatched tuple from both tables.
+ *
+ * For each left tuple, next() tries to find the right tuple from the hash table. If there is no hash bucket
+ * in the hash table. It returns a tuple. If next() find the hash bucket in the hash table, it reads tuples in
+ * the found bucket sequentially. If it cannot find tuple in the bucket, it returns a tuple.
+ *
+ * @return The tuple which is unmatched to a given join condition.
+ * @throws IOException
+ */
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean notFound;
+
+ while(!finished) {
+
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // Try to find a hash bucket in in-memory hash table
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+ if (rightTuples != null) {
+ // if found, it gets a hash bucket from the hash table.
+ iterator = rightTuples.iterator();
+ } else {
+ // if not found, it returns a tuple.
+ frameTuple.set(leftTuple, rightNullTuple);
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ }
+
+ // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+ // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
+ notFound = true;
+ while (notFound && iterator.hasNext()) {
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
+ notFound = false;
+ }
+ }
+
+ if (notFound) { // if there is no matched tuple
+ frameTuple.set(leftTuple, rightNullTuple);
+ projector.eval(frameTuple, outTuple);
+ break;
+ }
+ }
+
+ return outTuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
new file mode 100644
index 0000000..849dc38
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ protected JoinNode plan;
+ protected EvalNode joinQual;
+
+ protected List<Column[]> joinKeyPairs;
+
+ // temporal tuples and states for nested loop join
+ protected boolean first = true;
+ protected FrameTuple frameTuple;
+ protected Tuple outTuple = null;
+ protected Map<Tuple, List<Tuple>> tupleSlots;
+ protected Iterator<Tuple> iterator = null;
+ protected Tuple leftTuple;
+ protected Tuple leftKeyTuple;
+
+ protected int [] leftKeyList;
+ protected int [] rightKeyList;
+
+ protected boolean finished = false;
+ protected boolean shouldGetLeftTuple = true;
+
+ // projection
+ protected Projector projector;
+
+ private int rightNumCols;
+ private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
+
+ public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+ PhysicalExec rightChild) {
+ super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
+ plan.getOutSchema(), leftChild, rightChild);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+ // HashJoin only can manage equi join key pairs.
+ this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(),
+ rightChild.getSchema(), false);
+
+ leftKeyList = new int[joinKeyPairs.size()];
+ rightKeyList = new int[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+ }
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+ }
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+ leftKeyTuple = new VTuple(leftKeyList.length);
+
+ rightNumCols = rightChild.getSchema().size();
+ }
+
+ protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+ for (int i = 0; i < leftKeyList.length; i++) {
+ keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+ }
+ }
+
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean found = false;
+
+ while(!finished) {
+
+ if (shouldGetLeftTuple) { // initially, it is true.
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // getting corresponding right
+ getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+ List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+ if (rightTuples != null) { // found right tuples on in-memory hash table.
+ iterator = rightTuples.iterator();
+ shouldGetLeftTuple = false;
+ } else {
+ // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(frameTuple, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ shouldGetLeftTuple = true;
+ return outTuple;
+ }
+ }
+
+ // getting a next right tuple on in-memory hash table.
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+ projector.eval(frameTuple, outTuple);
+ found = true;
+ }
+
+ if (!iterator.hasNext()) { // no more right tuples for this hash key
+ shouldGetLeftTuple = true;
+ }
+
+ if (found) {
+ break;
+ }
+ }
+
+ return outTuple;
+ }
+
+ protected void loadRightToHashTable() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+
+ while ((tuple = rightChild.next()) != null) {
+ keyTuple = new VTuple(joinKeyPairs.size());
+ for (int i = 0; i < rightKeyList.length; i++) {
+ keyTuple.put(i, tuple.get(rightKeyList[i]));
+ }
+
+ List<Tuple> newValue = tupleSlots.get(keyTuple);
+ if (newValue != null) {
+ newValue.add(tuple);
+ } else {
+ newValue = new ArrayList<Tuple>();
+ newValue.add(tuple);
+ tupleSlots.put(keyTuple, newValue);
+ }
+ }
+ first = false;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ tupleSlots.clear();
+ first = true;
+
+ finished = false;
+ iterator = null;
+ shouldGetLeftTuple = true;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ tupleSlots.clear();
+ tupleSlots = null;
+ iterator = null;
+ plan = null;
+ joinQual = null;
+ projector = null;
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
new file mode 100644
index 0000000..4fbb5e4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Prepare a hash table of the NOT IN side of the join. Scan the FROM side table.
+ * For each tuple of the FROM side table, it tries to find a matched tuple from the hash table for the NOT INT side.
+ * If found, it returns the tuple of the FROM side table.
+ */
+public class HashLeftSemiJoinExec extends HashJoinExec {
+ private Tuple rightNullTuple;
+
+ public HashLeftSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
+ PhysicalExec inSideChild) {
+ super(context, plan, fromSideChild, inSideChild);
+ // NUll Tuple
+ rightNullTuple = new VTuple(leftChild.outColumnNum);
+ for (int i = 0; i < leftChild.outColumnNum; i++) {
+ rightNullTuple.put(i, NullDatum.get());
+ }
+ }
+
+ /**
+ * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+ * next() method finds the first unmatched tuple from both tables.
+ *
+ * For each left tuple on the disk, next() tries to find at least one matched tuple from the hash table.
+ *
+ * In more detail, until there is a hash bucket matched to the left tuple in the hash table, it continues to traverse
+ * the left tuples. If next() finds the matched bucket in the hash table, it finds any matched tuple in the bucket.
+ * If found, it returns the composite tuple immediately without finding more matched tuple in the bucket.
+ *
+ * @return The tuple which is firstly matched to a given join condition.
+ * @throws java.io.IOException
+ */
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean notFound;
+
+ while(!finished) {
+
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // Try to find a hash bucket in in-memory hash table
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ List<Tuple> rightTuples = tupleSlots.get(leftKeyTuple);
+ if (rightTuples != null) {
+ // if found, it gets a hash bucket from the hash table.
+ iterator = rightTuples.iterator();
+ } else {
+ continue;
+ }
+
+ // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+ // If it finds any matched tuple, it returns the tuple immediately.
+ notFound = true;
+ while (notFound && iterator.hasNext()) {
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple);
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
+ notFound = false;
+ projector.eval(frameTuple, outTuple);
+ }
+ }
+
+ if (!notFound) { // if there is no matched tuple
+ break;
+ }
+ }
+
+ return outTuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
new file mode 100644
index 0000000..3ae53d9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+public class HashPartitioner extends Partitioner {
+ private final Tuple keyTuple;
+
+ public HashPartitioner(final int [] keys, final int numPartitions) {
+ super(keys, numPartitions);
+ this.keyTuple = new VTuple(partitionKeyIds.length);
+ }
+
+ @Override
+ public int getPartition(Tuple tuple) {
+ // In outer join, partition number can be zero because of empty tables.
+ // So, we should return zero for this case.
+ if (numPartitions == 0)
+ return 0;
+
+ // build one key tuple
+ for (int i = 0; i < partitionKeyIds.length; i++) {
+ keyTuple.put(i, tuple.get(partitionKeyIds[i]));
+ }
+ return (keyTuple.hashCode() & Integer.MAX_VALUE) %
+ (numPartitions == 32 ? numPartitions-1 : numPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
new file mode 100644
index 0000000..678b745
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <code>HashShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle keys. The file outputs are stored on local disks.
+ */
+public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
+ private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
+ private ShuffleFileWriteNode plan;
+ private final TableMeta meta;
+ private Partitioner partitioner;
+ private final Path storeTablePath;
+ private Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+ private final int numShuffleOutputs;
+ private final int [] shuffleKeyIds;
+
+ public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm,
+ final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ Preconditions.checkArgument(plan.hasShuffleKeys());
+ this.plan = plan;
+ if (plan.hasOptions()) {
+ this.meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+ } else {
+ this.meta = CatalogUtil.newTableMeta(plan.getStorageType());
+ }
+ // about the shuffle
+ this.numShuffleOutputs = this.plan.getNumOutputs();
+ int i = 0;
+ this.shuffleKeyIds = new int [this.plan.getShuffleKeys().length];
+ for (Column key : this.plan.getShuffleKeys()) {
+ shuffleKeyIds[i] = inSchema.getColumnId(key.getQualifiedName());
+ i++;
+ }
+ this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
+ storeTablePath = new Path(context.getWorkDir(), "output");
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+ FileSystem fs = new RawLocalFileSystem();
+ fs.mkdirs(storeTablePath);
+ }
+
+ private Appender getAppender(int partId) throws IOException {
+ Appender appender = appenderMap.get(partId);
+
+ if (appender == null) {
+ Path dataFile = getDataFile(partId);
+ FileSystem fs = dataFile.getFileSystem(context.getConf());
+ if (fs.exists(dataFile)) {
+ LOG.info("File " + dataFile + " already exists!");
+ FileStatus status = fs.getFileStatus(dataFile);
+ LOG.info("File size: " + status.getLen());
+ }
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+ appender.enableStats();
+ appender.init();
+ appenderMap.put(partId, appender);
+ } else {
+ appender = appenderMap.get(partId);
+ }
+
+ return appender;
+ }
+
+ private Path getDataFile(int partId) {
+ return StorageUtil.concatPath(storeTablePath, ""+partId);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ Appender appender;
+ int partId;
+ while ((tuple = child.next()) != null) {
+ partId = partitioner.getPartition(tuple);
+ appender = getAppender(partId);
+ appender.addTuple(tuple);
+ }
+
+ List<TableStats> statSet = new ArrayList<TableStats>();
+ for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
+ int partNum = entry.getKey();
+ Appender app = entry.getValue();
+ app.flush();
+ app.close();
+ statSet.add(app.getStats());
+ if (app.getStats().getNumRows() > 0) {
+ context.addShuffleFileOutput(partNum, getDataFile(partNum).getName());
+ }
+ }
+
+ // Collect and aggregated statistics data
+ TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
+ context.setResultStats(aggregated);
+
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ // nothing to do
+ }
+
+ @Override
+ public void close() throws IOException{
+ super.close();
+ if (appenderMap != null) {
+ appenderMap.clear();
+ appenderMap = null;
+ }
+
+ partitioner = null;
+ plan = null;
+
+ progress = 1.0f;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
new file mode 100644
index 0000000..0418f65
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.HavingNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class HavingExec extends UnaryPhysicalExec {
+ private final EvalNode qual;
+
+ public HavingExec(TaskAttemptContext context,
+ HavingNode plan,
+ PhysicalExec child) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+
+ this.qual = plan.getQual();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ while ((tuple = child.next()) != null) {
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ return tuple;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
new file mode 100644
index 0000000..0d4c47b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Comparator;
+
+/**
+ * The Comparator class for Outer and Inner Tuples
+ *
+ * @see org.apache.tajo.storage.Tuple
+ */
+public class JoinTupleComparator implements Comparator<Tuple> {
+ private int numSortKey;
+ private final int[] outerSortKeyIds;
+ private final int[] innerSortKeyIds;
+
+ private Datum outer;
+ private Datum inner;
+ private int compVal;
+
+ public JoinTupleComparator(Schema leftschema, Schema rightschema, SortSpec[][] sortKeys) {
+ Preconditions.checkArgument(sortKeys.length == 2,
+ "The two of the sortspecs must be given, but " + sortKeys.length + " sortkeys are given.");
+ Preconditions.checkArgument(sortKeys[0].length == sortKeys[1].length,
+ "The number of both side sortkeys must be equals, but they are different: "
+ + sortKeys[0].length + " and " + sortKeys[1].length);
+
+ this.numSortKey = sortKeys[0].length; // because it is guaranteed that the number of sortspecs are equals
+ this.outerSortKeyIds = new int[numSortKey];
+ this.innerSortKeyIds = new int[numSortKey];
+
+ for (int i = 0; i < numSortKey; i++) {
+ this.outerSortKeyIds[i] = leftschema.getColumnId(sortKeys[0][i].getSortKey().getQualifiedName());
+ this.innerSortKeyIds[i] = rightschema.getColumnId(sortKeys[1][i].getSortKey().getQualifiedName());
+ }
+ }
+
+ @Override
+ public int compare(Tuple outerTuple, Tuple innerTuple) {
+ for (int i = 0; i < numSortKey; i++) {
+ outer = outerTuple.get(outerSortKeyIds[i]);
+ inner = innerTuple.get(innerSortKeyIds[i]);
+
+ if (outer instanceof NullDatum || inner instanceof NullDatum) {
+ if (!outer.equals(inner)) {
+ if (outer instanceof NullDatum) {
+ compVal = 1;
+ } else if (inner instanceof NullDatum) {
+ compVal = -1;
+ }
+ } else {
+ compVal = 0;
+ }
+ } else {
+ compVal = outer.compareTo(inner);
+ }
+
+ if (compVal < 0 || compVal > 0) {
+ return compVal;
+ }
+ }
+ return 0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
new file mode 100644
index 0000000..d736c25
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.logical.LimitNode;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class LimitExec extends UnaryPhysicalExec {
+ private final long fetchFirstNum;
+ private long fetchCount;
+
+ public LimitExec(TaskAttemptContext context, Schema inSchema,
+ Schema outSchema, PhysicalExec child, LimitNode limit) {
+ super(context, inSchema, outSchema, child);
+ this.fetchFirstNum = limit.getFetchFirstNum();
+ this.fetchCount = 0;
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple = child.next();
+ fetchCount++;
+
+ if (fetchCount > fetchFirstNum || tuple == null) {
+ return null;
+ }
+
+ return tuple;
+ }
+
+ public void rescan() throws IOException {
+ super.init();
+ fetchCount = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
new file mode 100644
index 0000000..9f4f20a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class MemSortExec extends SortExec {
+ private SortNode plan;
+ private List<Tuple> tupleSlots;
+ private boolean sorted = false;
+ private Iterator<Tuple> iterator;
+
+ public MemSortExec(final TaskAttemptContext context,
+ SortNode plan, PhysicalExec child) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
+ this.plan = plan;
+ }
+
+ public void init() throws IOException {
+ super.init();
+ this.tupleSlots = new ArrayList<Tuple>(1000);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+
+ if (!sorted) {
+ Tuple tuple;
+ while ((tuple = child.next()) != null) {
+ tupleSlots.add(new VTuple(tuple));
+ }
+
+ Collections.sort(tupleSlots, getComparator());
+ this.iterator = tupleSlots.iterator();
+ sorted = true;
+ }
+
+ if (iterator.hasNext()) {
+ return this.iterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ this.iterator = tupleSlots.iterator();
+ sorted = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ tupleSlots.clear();
+ tupleSlots = null;
+ iterator = null;
+ plan = null;
+ }
+
+ public SortNode getPlan() {
+ return this.plan;
+ }
+}