You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:53 UTC
[31/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
new file mode 100644
index 0000000..ff1f7b3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode joinNode;
+ private EvalNode joinQual;
+
+ // temporal tuples and states for nested loop join
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple rightTuple = null;
+ private Tuple outTuple = null;
+ private Tuple leftNext = null;
+
+ private List<Tuple> leftTupleSlots;
+ private List<Tuple> rightTupleSlots;
+
+ private JoinTupleComparator joincomparator = null;
+ private TupleComparator[] tupleComparator = null;
+
+ private final static int INITIAL_TUPLE_SLOT = 10000;
+
+ private boolean end = false;
+
+ // projection
+ private Projector projector;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private int posRightTupleSlots = -1;
+ private int posLeftTupleSlots = -1;
+ boolean endInPopulationStage = false;
+ private boolean initRightDone = false;
+
+ public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+ PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+ Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+ "but there is no join condition");
+ this.joinNode = plan;
+ this.joinQual = plan.getJoinQual();
+
+ this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ SortSpec[][] sortSpecs = new SortSpec[2][];
+ sortSpecs[0] = leftSortKey;
+ sortSpecs[1] = rightSortKey;
+
+ this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
+ rightChild.getSchema(), sortSpecs);
+ this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+ plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+
+ leftNumCols = leftChild.getSchema().size();
+ rightNumCols = rightChild.getSchema().size();
+ }
+
+ public JoinNode getPlan(){
+ return this.joinNode;
+ }
+
+ public Tuple next() throws IOException {
+ Tuple previous;
+
+ for (;;) {
+ boolean newRound = false;
+ if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+ newRound = true;
+ }
+ if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+ newRound = true;
+ }
+
+ if(newRound == true){
+
+ if (end) {
+
+ ////////////////////////////////////////////////////////////////////////
+ // FINALIZING STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // the finalizing stage, where remaining tuples on the right are
+ // transformed into left-padded results while tuples on the left
+ // are transformed into right-padded results
+
+ // before exit, a left-padded tuple should be built for all remaining
+ // right side and a right-padded tuple should be built for all remaining
+ // left side
+
+ if (initRightDone == false) {
+ // maybe the left operand was empty => the right one didn't have the chance to initialize
+ rightTuple = rightChild.next();
+ initRightDone = true;
+ }
+
+ if((leftTuple == null) && (rightTuple == null)) {
+ return null;
+ }
+
+ if((leftTuple == null) && (rightTuple != null)){
+ // output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(frameTuple, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ rightTuple = rightChild.next();
+ return outTuple;
+ }
+
+ if((leftTuple != null) && (rightTuple == null)){
+ // output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(frameTuple, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ leftTuple = leftChild.next();
+ return outTuple;
+ }
+ } // if end
+
+ ////////////////////////////////////////////////////////////////////////
+ // INITIALIZING STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // initializing stage, reading the first tuple on each side
+ if (leftTuple == null) {
+ leftTuple = leftChild.next();
+ if( leftTuple == null){
+ end = true;
+ continue;
+ }
+ }
+ if (rightTuple == null) {
+ rightTuple = rightChild.next();
+ initRightDone = true;
+ if (rightTuple == null) {
+ end = true;
+ continue;
+ }
+ }
+
+ // reset tuple slots for a new round
+ leftTupleSlots.clear();
+ rightTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+
+ ////////////////////////////////////////////////////////////////////////
+ // Comparison and Move Forward Stage
+ ////////////////////////////////////////////////////////////////////////
+ // advance alternatively on each side until a match is found
+ int cmp;
+ while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
+
+ if (cmp > 0) {
+
+ //before getting a new tuple from the right, a leftnullpadded tuple should be built
+ //output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(frameTuple, outTuple);
+ // BEFORE RETURN, MOVE FORWARD
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ end = true;
+ }
+
+ return outTuple;
+
+ } else if (cmp < 0) {
+ // before getting a new tuple from the left, a rightnullpadded tuple should be built
+ // output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(frameTuple, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ // BEFORE RETURN, MOVE FORWARD
+ leftTuple = leftChild.next();
+ if(leftTuple == null) {
+ end = true;
+ }
+
+ return outTuple;
+
+ } // if (cmp < 0)
+ } //while
+
+
+ ////////////////////////////////////////////////////////////////////////
+ // SLOTS POPULATION STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // once a match is found, retain all tuples with this key in tuple slots
+ // on each side
+ if(!end) {
+ endInPopulationStage = false;
+
+ boolean endLeft = false;
+ boolean endRight = false;
+
+ previous = new VTuple(leftTuple);
+ do {
+ leftTupleSlots.add(new VTuple(leftTuple));
+ leftTuple = leftChild.next();
+ if(leftTuple == null) {
+ endLeft = true;
+ }
+
+
+ } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+ posLeftTupleSlots = 0;
+
+
+ previous = new VTuple(rightTuple);
+ do {
+ rightTupleSlots.add(new VTuple(rightTuple));
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ endRight = true;
+ }
+
+ } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+ posRightTupleSlots = 0;
+
+ if ((endLeft == true) || (endRight == true)) {
+ end = true;
+ endInPopulationStage = true;
+ }
+
+ } // if end false
+ } // if newRound
+
+
+ ////////////////////////////////////////////////////////////////////////
+ // RESULTS STAGE
+ ////////////////////////////////////////////////////////////////////////
+ // now output result matching tuples from the slots
+ // if either we haven't reached end on neither side, or we did reach end
+ // on one(or both) sides but that happened in the slots population step
+ // (i.e. refers to next round)
+ if(!end || (end && endInPopulationStage)){
+ if(posLeftTupleSlots == 0){
+ leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+ }
+
+ if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
+ Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+ frameTuple.set(leftNext, aTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ } else {
+ // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+ if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
+ //rewind the right slots position
+ posRightTupleSlots = 0;
+ Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+ leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+
+ frameTuple.set(leftNext, aTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ }
+ }
+ } // the second if end false
+ } // for
+ }
+
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ leftTupleSlots.clear();
+ rightTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ leftTupleSlots.clear();
+ rightTupleSlots.clear();
+ leftTupleSlots = null;
+ rightTupleSlots = null;
+ joinNode = null;
+ joinQual = null;
+ projector = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
new file mode 100644
index 0000000..470e1c9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode joinNode;
+ private EvalNode joinQual;
+
+ // temporal tuples and states for nested loop join
+ private FrameTuple frameTuple;
+ private Tuple outerTuple = null;
+ private Tuple innerTuple = null;
+ private Tuple outTuple = null;
+ private Tuple outerNext = null;
+
+ private List<Tuple> outerTupleSlots;
+ private List<Tuple> innerTupleSlots;
+ private Iterator<Tuple> outerIterator;
+ private Iterator<Tuple> innerIterator;
+
+ private JoinTupleComparator joincomparator = null;
+ private TupleComparator[] tupleComparator = null;
+
+ private final static int INITIAL_TUPLE_SLOT = 10000;
+
+ private boolean end = false;
+
+ // projection
+ private Projector projector;
+
+ public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+ Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+ "but there is no join condition");
+ this.joinNode = plan;
+ this.joinQual = plan.getJoinQual();
+
+ this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ SortSpec[][] sortSpecs = new SortSpec[2][];
+ sortSpecs[0] = outerSortKey;
+ sortSpecs[1] = innerSortKey;
+
+ this.joincomparator = new JoinTupleComparator(outer.getSchema(),
+ inner.getSchema(), sortSpecs);
+ this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+ plan.getJoinQual(), outer.getSchema(), inner.getSchema());
+ this.outerIterator = outerTupleSlots.iterator();
+ this.innerIterator = innerTupleSlots.iterator();
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+ }
+
+ public JoinNode getPlan(){
+ return this.joinNode;
+ }
+
+ public Tuple next() throws IOException {
+ Tuple previous;
+
+ for (;;) {
+ if (!outerIterator.hasNext() && !innerIterator.hasNext()) {
+ if(end){
+ return null;
+ }
+
+ if(outerTuple == null){
+ outerTuple = leftChild.next();
+ }
+ if(innerTuple == null){
+ innerTuple = rightChild.next();
+ }
+
+ outerTupleSlots.clear();
+ innerTupleSlots.clear();
+
+ int cmp;
+ while ((cmp = joincomparator.compare(outerTuple, innerTuple)) != 0) {
+ if (cmp > 0) {
+ innerTuple = rightChild.next();
+ } else if (cmp < 0) {
+ outerTuple = leftChild.next();
+ }
+ if (innerTuple == null || outerTuple == null) {
+ return null;
+ }
+ }
+
+ try {
+ previous = outerTuple.clone();
+ do {
+ outerTupleSlots.add(outerTuple.clone());
+ outerTuple = leftChild.next();
+ if (outerTuple == null) {
+ end = true;
+ break;
+ }
+ } while (tupleComparator[0].compare(previous, outerTuple) == 0);
+ outerIterator = outerTupleSlots.iterator();
+ outerNext = outerIterator.next();
+
+ previous = innerTuple.clone();
+ do {
+ innerTupleSlots.add(innerTuple.clone());
+ innerTuple = rightChild.next();
+ if (innerTuple == null) {
+ end = true;
+ break;
+ }
+ } while (tupleComparator[1].compare(previous, innerTuple) == 0);
+ innerIterator = innerTupleSlots.iterator();
+ } catch (CloneNotSupportedException e) {
+
+ }
+ }
+
+ if(!innerIterator.hasNext()){
+ outerNext = outerIterator.next();
+ innerIterator = innerTupleSlots.iterator();
+ }
+
+ frameTuple.set(outerNext, innerIterator.next());
+
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ }
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ outerTupleSlots.clear();
+ innerTupleSlots.clear();
+ outerIterator = outerTupleSlots.iterator();
+ innerIterator = innerTupleSlots.iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ outerTupleSlots.clear();
+ innerTupleSlots.clear();
+ outerTupleSlots = null;
+ innerTupleSlots = null;
+ outerIterator = null;
+ innerIterator = null;
+ joinQual = null;
+ projector = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
new file mode 100644
index 0000000..6e5900e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class NLJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode plan;
+ private EvalNode joinQual;
+
+
+ // temporal tuples and states for nested loop join
+ private boolean needNewOuter;
+ private FrameTuple frameTuple;
+ private Tuple outerTuple = null;
+ private Tuple innerTuple = null;
+ private Tuple outTuple = null;
+
+ // projection
+ private final Projector projector;
+
+ public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+ this.plan = plan;
+
+ if (plan.hasJoinQual()) {
+ this.joinQual = plan.getJoinQual();
+ }
+
+ // for projection
+ projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ needNewOuter = true;
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+
+ public Tuple next() throws IOException {
+ for (;;) {
+ if (needNewOuter) {
+ outerTuple = leftChild.next();
+ if (outerTuple == null) {
+ return null;
+ }
+ needNewOuter = false;
+ }
+
+ innerTuple = rightChild.next();
+ if (innerTuple == null) {
+ needNewOuter = true;
+ rightChild.rescan();
+ continue;
+ }
+
+ frameTuple.set(outerTuple, innerTuple);
+ if (joinQual != null) {
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ }
+ } else {
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ }
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ needNewOuter = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
new file mode 100644
index 0000000..5c17c40
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode plan;
+ private EvalNode joinQual;
+
+ // temporal tuples and states for nested loop join
+ private boolean needNextRightTuple;
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple rightTuple = null;
+ private Tuple outTuple = null;
+
+ // projection
+ private final Projector projector;
+
+ private boolean foundAtLeastOneMatch;
+ private int rightNumCols;
+
+ public NLLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+ PhysicalExec rightChild) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+ this.plan = plan;
+
+ if (plan.hasJoinQual()) {
+ this.joinQual = plan.getJoinQual();
+ }
+
+ // for projection
+ projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ needNextRightTuple = true;
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+
+ foundAtLeastOneMatch = false;
+ rightNumCols = rightChild.getSchema().size();
+ }
+
+ public JoinNode getPlan() {
+ return this.plan;
+ }
+
+ public Tuple next() throws IOException {
+ for (;;) {
+ if (needNextRightTuple) {
+ leftTuple = leftChild.next();
+ if (leftTuple == null) {
+ return null;
+ }
+ needNextRightTuple = false;
+ // a new tuple from the left child has initially no matches on the right operand
+ foundAtLeastOneMatch = false;
+ }
+ rightTuple = rightChild.next();
+
+ if (rightTuple == null) {
+ // the scan of the right operand is finished with no matches found
+ if(foundAtLeastOneMatch == false){
+ //output a tuple with the nulls padded rightTuple
+ Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+ frameTuple.set(leftTuple, nullPaddedTuple);
+ projector.eval(frameTuple, outTuple);
+ // we simulate we found a match, which is exactly the null padded one
+ foundAtLeastOneMatch = true;
+ needNextRightTuple = true;
+ rightChild.rescan();
+ return outTuple;
+ } else {
+ needNextRightTuple = true;
+ rightChild.rescan();
+ continue;
+ }
+ }
+
+ frameTuple.set(leftTuple, rightTuple);
+ ;
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outTuple);
+ foundAtLeastOneMatch = true;
+ return outTuple;
+ }
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ needNextRightTuple = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
new file mode 100644
index 0000000..7f86ba2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * A Scanner that reads multiple partitions
+ */
+public class PartitionMergeScanExec extends PhysicalExec {
+ private final ScanNode plan;
+ private SeqScanExec currentScanner = null;
+
+ private CatalogProtos.FragmentProto [] fragments;
+
+ private List<SeqScanExec> scanners = Lists.newArrayList();
+ private Iterator<SeqScanExec> iterator;
+
+ private AbstractStorageManager sm;
+
+ private float progress;
+ protected TableStats inputStats;
+
+ public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+ ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+
+ this.plan = plan;
+ this.fragments = fragments;
+ this.sm = sm;
+
+ inputStats = new TableStats();
+ }
+
+ public void init() throws IOException {
+ for (CatalogProtos.FragmentProto fragment : fragments) {
+ scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
+ new CatalogProtos.FragmentProto[] {fragment}));
+ }
+ progress = 0.0f;
+ rescan();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ while (currentScanner != null) {
+ tuple = currentScanner.next();
+
+ if (tuple != null) {
+ return tuple;
+ }
+
+ if (iterator.hasNext()) {
+ if (currentScanner != null) {
+ currentScanner.close();
+ }
+ currentScanner = iterator.next();
+ currentScanner.init();
+ } else {
+ break;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ if (scanners.size() > 0) {
+ iterator = scanners.iterator();
+ currentScanner = iterator.next();
+ currentScanner.init();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (SeqScanExec scanner : scanners) {
+ scanner.close();
+ TableStats scannerTableStsts = scanner.getInputStats();
+ if (scannerTableStsts != null) {
+ inputStats.merge(scannerTableStsts);
+ }
+ }
+ iterator = null;
+ progress = 1.0f;
+ }
+
+ public String getTableName() {
+ return plan.getTableName();
+ }
+
+ @Override
+ public float getProgress() {
+ if (iterator != null) {
+ float progressSum = 0.0f;
+ for (SeqScanExec scanner : scanners) {
+ progressSum += scanner.getProgress();
+ }
+ if (progressSum > 0) {
+ // get a average progress - divide progress summary by the number of scanners
+ return progressSum / (float)(scanners.size());
+ } else {
+ return 0.0f;
+ }
+ } else {
+ return progress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return inputStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
new file mode 100644
index 0000000..6d544a7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.storage.Tuple;
+
+public abstract class Partitioner {
+ protected final int [] partitionKeyIds;
+ protected final int numPartitions;
+
+ public Partitioner(final int [] keyList, final int numPartitions) {
+ Preconditions.checkArgument(keyList != null,
+ "Partition keys must be given");
+ Preconditions.checkArgument(keyList.length >= 0,
+ "At least one partition key must be specified.");
+ // In outer join, zero can be passed into this value because of empty tables.
+ // So, we should allow zero.
+ Preconditions.checkArgument(numPartitions >= 0,
+ "The number of partitions must be positive: %s", numPartitions);
+ this.partitionKeyIds = keyList;
+ this.numPartitions = numPartitions;
+ }
+
+ public abstract int getPartition(Tuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
new file mode 100644
index 0000000..e30a10b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public abstract class PhysicalExec implements SchemaObject {
+ protected final TaskAttemptContext context;
+ protected Schema inSchema;
+ protected Schema outSchema;
+ protected int outColumnNum;
+
+ public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
+ final Schema outSchema) {
+ this.context = context;
+ this.inSchema = inSchema;
+ this.outSchema = outSchema;
+ this.outColumnNum = outSchema.size();
+ }
+
+ public final Schema getSchema() {
+ return outSchema;
+ }
+
+ public abstract void init() throws IOException;
+
+ public abstract Tuple next() throws IOException;
+
+ public abstract void rescan() throws IOException;
+
+ public abstract void close() throws IOException;
+
+ public abstract float getProgress();
+
+ protected void info(Log log, String message) {
+ log.info("["+ context.getTaskId() + "] " + message);
+ }
+
+ protected void warn(Log log, String message) {
+ log.warn("[" + context.getTaskId() + "] " + message);
+ }
+
+ protected void fatal(Log log, String message) {
+ log.fatal("[" + context.getTaskId() + "] " + message);
+ }
+
+ protected Path getExecutorTmpDir() {
+ return new Path(UUID.randomUUID().toString());
+ }
+
+ public TableStats getInputStats() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
new file mode 100644
index 0000000..738db62
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
+
+ RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashBasedColPartitionStore(CONTEXT context, HashBasedColPartitionStoreExec exec,
+ Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitSortBasedColPartitionStore(CONTEXT context, SortBasedColPartitionStoreExec exec,
+ Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+
+ RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
new file mode 100644
index 0000000..fdd1839
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class PhysicalPlanUtil {
+ public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
+ throws PhysicalPlanningException {
+ return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz);
+ }
+
+ private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
+ public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
+ throws PhysicalPlanningException {
+ if (target.isAssignableFrom(exec.getClass())) {
+ return exec;
+ } else {
+ return super.visit(exec, stack, target);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
new file mode 100644
index 0000000..62add1e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+ public PhysicalPlanningException(String message) {
+ super(message);
+ }
+
+ public PhysicalPlanningException(Exception ioe) {
+ super(ioe);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
new file mode 100644
index 0000000..ee6ef1d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.Projectable;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class ProjectionExec extends UnaryPhysicalExec {
+ private Projectable plan;
+
+ // for projection
+ private Tuple outTuple;
+ private Projector projector;
+
+ public ProjectionExec(TaskAttemptContext context, Projectable plan,
+ PhysicalExec child) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ this.plan = plan;
+ }
+
+ public void init() throws IOException {
+ super.init();
+
+ this.outTuple = new VTuple(outSchema.size());
+ this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple = child.next();
+
+ if (tuple == null) {
+ return null;
+ }
+
+ projector.eval(tuple, outTuple);
+ return outTuple;
+ }
+
+ @Override
+ public void close() throws IOException{
+ super.close();
+ plan = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
new file mode 100644
index 0000000..68379d1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * <code>RangeShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle key ranges. The file outputs are stored with index files on local disks.
+ * <code>RangeShuffleFileWriteExec</code> is implemented with an assumption that input tuples are sorted in an
+ * specified order of shuffle keys.
+ */
+public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
+ private static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class);
+ private final SortSpec[] sortSpecs;
+ private int [] indexKeys = null;
+ private Schema keySchema;
+
+ private BSTIndex.BSTIndexWriter indexWriter;
+ private TupleComparator comp;
+ private FileAppender appender;
+ private TableMeta meta;
+
+ public RangeShuffleFileWriteExec(final TaskAttemptContext context, final AbstractStorageManager sm,
+ final PhysicalExec child, final Schema inSchema, final Schema outSchema,
+ final SortSpec[] sortSpecs) throws IOException {
+ super(context, inSchema, outSchema, child);
+ this.sortSpecs = sortSpecs;
+ }
+
+ public void init() throws IOException {
+ super.init();
+
+ indexKeys = new int[sortSpecs.length];
+ keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
+
+ Column col;
+ for (int i = 0 ; i < sortSpecs.length; i++) {
+ col = sortSpecs[i].getSortKey();
+ indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
+ }
+
+ BSTIndex bst = new BSTIndex(new TajoConf());
+ this.comp = new TupleComparator(keySchema, sortSpecs);
+ Path storeTablePath = new Path(context.getWorkDir(), "output");
+ LOG.info("Output data directory: " + storeTablePath);
+ this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
+ context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
+ FileSystem fs = new RawLocalFileSystem();
+ fs.mkdirs(storeTablePath);
+ this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+ outSchema, new Path(storeTablePath, "output"));
+ this.appender.enableStats();
+ this.appender.init();
+ this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
+ BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+ this.indexWriter.setLoadNum(100);
+ this.indexWriter.open();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+ Tuple prevKeyTuple = null;
+ long offset;
+
+
+ while((tuple = child.next()) != null) {
+ offset = appender.getOffset();
+ appender.addTuple(tuple);
+ keyTuple = new VTuple(keySchema.size());
+ RowStoreUtil.project(tuple, keyTuple, indexKeys);
+ if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
+ indexWriter.write(keyTuple, offset);
+ prevKeyTuple = keyTuple;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ }
+
+ public void close() throws IOException {
+ super.close();
+
+ appender.flush();
+ IOUtils.cleanup(LOG, appender);
+ indexWriter.flush();
+ IOUtils.cleanup(LOG, indexWriter);
+
+ // Collect statistics data
+ context.setResultStats(appender.getStats());
+ context.addShuffleFileOutput(0, context.getTaskId().toString());
+ appender = null;
+ indexWriter = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
new file mode 100644
index 0000000..c70174a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode joinNode;
+ private EvalNode joinQual;
+
+ // temporal tuples and states for nested loop join
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple rightTuple = null;
+ private Tuple outTuple = null;
+ private Tuple nextLeft = null;
+
+ private List<Tuple> leftTupleSlots;
+ private List<Tuple> innerTupleSlots;
+
+ private JoinTupleComparator joinComparator = null;
+ private TupleComparator[] tupleComparator = null;
+
+ private final static int INITIAL_TUPLE_SLOT = 10000;
+
+ private boolean end = false;
+
+ // projection
+ private Projector projector;
+
+ private int rightNumCols;
+ private int leftNumCols;
+ private int posRightTupleSlots = -1;
+ private int posLeftTupleSlots = -1;
+ private boolean endInPopulationStage = false;
+ private boolean initRightDone = false;
+
+ public RightOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+ PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), outer, inner);
+ Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+ "but there is no join condition");
+ this.joinNode = plan;
+ this.joinQual = plan.getJoinQual();
+
+ this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+ SortSpec[][] sortSpecs = new SortSpec[2][];
+ sortSpecs[0] = outerSortKey;
+ sortSpecs[1] = innerSortKey;
+
+ this.joinComparator = new JoinTupleComparator(outer.getSchema(), inner.getSchema(), sortSpecs);
+ this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+ plan.getJoinQual(), outer.getSchema(), inner.getSchema());
+
+ // for projection
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outTuple = new VTuple(outSchema.size());
+
+ leftNumCols = outer.getSchema().size();
+ }
+
+ public JoinNode getPlan() {
+ return this.joinNode;
+ }
+
+ /**
+ * creates a tuple of a given size filled with NULL values in all fields
+ */
+ private Tuple createNullPaddedTuple(int columnNum){
+ VTuple tuple = new VTuple(columnNum);
+ for (int i = 0; i < columnNum; i++) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }
+ return tuple;
+ }
+
+ /**
+ *
+ * Right outer merge join consists of four stages
+ * <ul>
+ * <li>initialization stage: </li>
+ * <li>finalizing stage: </li>
+ * </ul>
+ *
+ * @return
+ * @throws IOException
+ */
+ public Tuple next() throws IOException {
+ Tuple previous;
+
+ for (;;) {
+ boolean newRound = false;
+ if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+ newRound = true;
+ }
+ if ((posRightTupleSlots == innerTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+ newRound = true;
+ }
+
+ if (newRound) {
+
+ //////////////////////////////////////////////////////////////////////
+ // BEGIN FINALIZING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // The finalizing stage, where remaining tuples on the only right are transformed into left-padded results
+ if (end) {
+ if (initRightDone == false) {
+ // maybe the left operand was empty => the right one didn't have the chance to initialize
+ rightTuple = rightChild.next();
+ initRightDone = true;
+ }
+
+ if(rightTuple == null) {
+ return null;
+ } else {
+ // output a tuple with the nulls padded leftTuple
+ Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(frameTuple, outTuple);
+
+ // we simulate we found a match, which is exactly the null padded one
+ rightTuple = rightChild.next();
+
+ return outTuple;
+ }
+ }
+ //////////////////////////////////////////////////////////////////////
+ // END FINALIZING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+
+ //////////////////////////////////////////////////////////////////////
+ // BEGIN INITIALIZATION STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // This stage reads the first tuple on each side
+ if (leftTuple == null) {
+ leftTuple = leftChild.next();
+
+ if (leftTuple == null) {
+ end = true;
+ continue;
+ }
+ }
+
+ if(rightTuple == null){
+ rightTuple = rightChild.next();
+
+ if(rightTuple != null){
+ initRightDone = true;
+ }
+ else {
+ initRightDone = true;
+ end = true;
+ continue;
+ }
+ }
+ //////////////////////////////////////////////////////////////////////
+ // END INITIALIZATION STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // reset tuple slots for a new round
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+
+
+ //////////////////////////////////////////////////////////////////////
+ // BEGIN MOVE FORWARDING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // This stage moves forward a tuple cursor on each side relation until a match
+ // is found
+ int cmp;
+ while ((end != true) && ((cmp = joinComparator.compare(leftTuple, rightTuple)) != 0)) {
+
+ // if right is lower than the left tuple, it means that all right tuples s.t. cmp <= 0 are
+ // matched tuples.
+ if (cmp > 0) {
+ // before getting a new tuple from the right, a left null padded tuple should be built
+ // output a tuple with the nulls padded left tuple
+ Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
+ frameTuple.set(nullPaddedTuple, rightTuple);
+ projector.eval(frameTuple, outTuple);
+
+ // we simulate we found a match, which is exactly the null padded one
+ // BEFORE RETURN, MOVE FORWARD
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ end = true;
+ }
+ return outTuple;
+
+ } else if (cmp < 0) {
+ // If the left tuple is lower than the right tuple, just move forward the left tuple cursor.
+ leftTuple = leftChild.next();
+ if(leftTuple == null) {
+ end = true;
+ // in original algorithm we had return null ,
+ // but now we need to continue the end processing phase for remaining unprocessed right tuples
+ }
+ } // if (cmp<0)
+ } // while
+ //////////////////////////////////////////////////////////////////////
+ // END MOVE FORWARDING STAGE
+ //////////////////////////////////////////////////////////////////////
+
+ // once a match is found, retain all tuples with this key in tuple slots on each side
+ if(!end) {
+ endInPopulationStage = false;
+
+ boolean endOuter = false;
+ boolean endInner = false;
+
+ previous = new VTuple(leftTuple);
+ do {
+ leftTupleSlots.add(new VTuple(leftTuple));
+ leftTuple = leftChild.next();
+ if( leftTuple == null) {
+ endOuter = true;
+ }
+ } while ((endOuter != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+ posLeftTupleSlots = 0;
+
+ previous = new VTuple(rightTuple);
+
+ do {
+ innerTupleSlots.add(new VTuple(rightTuple));
+ rightTuple = rightChild.next();
+ if(rightTuple == null) {
+ endInner = true;
+ }
+
+ } while ((endInner != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+ posRightTupleSlots = 0;
+
+ if ((endOuter == true) || (endInner == true)) {
+ end = true;
+ endInPopulationStage = true;
+ }
+ } // if end false
+ } // if newRound
+
+
+ // Now output result matching tuples from the slots
+ // if either we haven't reached end on neither side, or we did reach end on one(or both) sides
+ // but that happened in the slots population step (i.e. refers to next round)
+
+ if ((end == false) || ((end == true) && (endInPopulationStage == true))){
+
+ if(posLeftTupleSlots == 0){
+ nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+ }
+
+
+ if(posRightTupleSlots <= (innerTupleSlots.size() -1)) {
+
+ Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+
+ frameTuple.set(nextLeft, aTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+
+ } else {
+ // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+ if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) {
+ //rewind the right slots position
+ posRightTupleSlots = 0;
+ Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+ posRightTupleSlots = posRightTupleSlots + 1;
+ nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+ posLeftTupleSlots = posLeftTupleSlots + 1;
+
+ frameTuple.set(nextLeft, aTuple);
+ joinQual.eval(inSchema, frameTuple);
+ projector.eval(frameTuple, outTuple);
+ return outTuple;
+ }
+ }
+ } // the second if end false
+ } // for
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ posRightTupleSlots = -1;
+ posLeftTupleSlots = -1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ leftTupleSlots.clear();
+ innerTupleSlots.clear();
+ leftTupleSlots = null;
+ innerTupleSlots = null;
+ joinNode = null;
+ joinQual = null;
+ projector = null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
new file mode 100644
index 0000000..2e676e9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.SelectionNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class SelectionExec extends UnaryPhysicalExec {
+ private final EvalNode qual;
+
+ public SelectionExec(TaskAttemptContext context,
+ SelectionNode plan,
+ PhysicalExec child) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ this.qual = plan.getQual();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ while ((tuple = child.next()) != null) {
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ return tuple;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
new file mode 100644
index 0000000..6dbcc3f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.utils.*;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class SeqScanExec extends PhysicalExec {
+ private ScanNode plan;
+
+ private Scanner scanner = null;
+
+ private EvalNode qual = null;
+
+ private CatalogProtos.FragmentProto [] fragments;
+
+ private Projector projector;
+
+ private TableStats inputStats;
+
+ private TupleCacheKey cacheKey;
+
+ private boolean cacheRead = false;
+
+ public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+ ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+
+ this.plan = plan;
+ this.qual = plan.getQual();
+ this.fragments = fragments;
+
+ if (plan.isBroadcastTable()) {
+ cacheKey = new TupleCacheKey(
+ context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName());
+ }
+ }
+
+ /**
+ * This method rewrites an input schema of column-partitioned table because
+ * there are no actual field values in data file in a column-partitioned table.
+ * So, this method removes partition key columns from the input schema.
+ *
+ * TODO - This implementation assumes that a fragment is always FileFragment.
+ * In the column partitioned table, a path has an important role to
+ * indicate partition keys. In this time, it is right. Later, we have to fix it.
+ */
+ private void rewriteColumnPartitionedTableSchema() throws IOException {
+ PartitionMethodDesc partitionDesc = plan.getTableDesc().getPartitionMethod();
+ Schema columnPartitionSchema = SchemaUtil.clone(partitionDesc.getExpressionSchema());
+ String qualifier = inSchema.getColumn(0).getQualifier();
+ columnPartitionSchema.setQualifier(qualifier);
+
+ // Remove partition key columns from an input schema.
+ this.inSchema = plan.getTableDesc().getSchema();
+
+ List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+
+ // Get a partition key value from a given path
+ Tuple partitionRow =
+ TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+
+ // Targets or search conditions may contain column references.
+ // However, actual values absent in tuples. So, Replace all column references by constant datum.
+ for (Column column : columnPartitionSchema.toArray()) {
+ FieldEval targetExpr = new FieldEval(column);
+ Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
+ ConstEval constExpr = new ConstEval(datum);
+
+ for (Target target : plan.getTargets()) {
+ if (target.getEvalTree().equals(targetExpr)) {
+ if (!target.hasAlias()) {
+ target.setAlias(target.getEvalTree().getName());
+ }
+ target.setExpr(constExpr);
+ } else {
+ EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
+ }
+ }
+
+ if (plan.hasQual()) {
+ EvalTreeUtil.replace(plan.getQual(), targetExpr, constExpr);
+ }
+ }
+ }
+
+ public void init() throws IOException {
+ Schema projected;
+
+ if (fragments != null
+ && plan.getTableDesc().hasPartition()
+ && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
+ rewriteColumnPartitionedTableSchema();
+ }
+
+ if (plan.hasTargets()) {
+ projected = new Schema();
+ Set<Column> columnSet = new HashSet<Column>();
+
+ if (plan.hasQual()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ }
+
+ for (Target t : plan.getTargets()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
+ }
+
+ for (Column column : inSchema.getColumns()) {
+ if (columnSet.contains(column)) {
+ projected.addColumn(column);
+ }
+ }
+ } else {
+ projected = outSchema;
+ }
+
+ if (cacheKey != null) {
+ TupleCache tupleCache = TupleCache.getInstance();
+ if (tupleCache.isBroadcastCacheReady(cacheKey)) {
+ openCacheScanner();
+ } else {
+ if (TupleCache.getInstance().lockBroadcastScan(cacheKey)) {
+ scanAndAddCache(projected);
+ openCacheScanner();
+ } else {
+ Object lockMonitor = tupleCache.getLockMonitor();
+ synchronized (lockMonitor) {
+ try {
+ lockMonitor.wait(20 * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (tupleCache.isBroadcastCacheReady(cacheKey)) {
+ openCacheScanner();
+ } else {
+ initScanner(projected);
+ }
+ }
+ }
+ } else {
+ initScanner(projected);
+ }
+ }
+
+ private void initScanner(Schema projected) throws IOException {
+ this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ if (fragments != null) {
+ if (fragments.length > 1) {
+ this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
+ FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
+ fragments), projected
+ );
+ } else {
+ this.scanner = StorageManagerFactory.getStorageManager(
+ context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragments[0],
+ projected);
+ }
+ scanner.init();
+ }
+ }
+
+ private void openCacheScanner() throws IOException {
+ Scanner cacheScanner = TupleCache.getInstance().openCacheScanner(cacheKey, plan.getPhysicalSchema());
+ if (cacheScanner != null) {
+ scanner = cacheScanner;
+ cacheRead = true;
+ }
+ }
+
+ private void scanAndAddCache(Schema projected) throws IOException {
+ initScanner(projected);
+
+ List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>();
+ while (true) {
+ Tuple tuple = next();
+ if (tuple != null) {
+ broadcastTupleCacheList.add(tuple);
+ } else {
+ break;
+ }
+ }
+
+ scanner.close();
+ scanner = null;
+
+ TupleCache.getInstance().addBroadcastCache(cacheKey, broadcastTupleCacheList);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (fragments == null) {
+ return null;
+ }
+
+ Tuple tuple;
+ Tuple outTuple = new VTuple(outColumnNum);
+
+ if (!plan.hasQual()) {
+ if ((tuple = scanner.next()) != null) {
+ if (cacheRead) {
+ return tuple;
+ }
+ projector.eval(tuple, outTuple);
+ outTuple.setOffset(tuple.getOffset());
+ return outTuple;
+ } else {
+ return null;
+ }
+ } else {
+ while ((tuple = scanner.next()) != null) {
+ if (cacheRead) {
+ return tuple;
+ }
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ projector.eval(tuple, outTuple);
+ return outTuple;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ scanner.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.cleanup(null, scanner);
+ if (scanner != null) {
+ try {
+ TableStats stat = scanner.getInputStats();
+ if (stat != null) {
+ inputStats = (TableStats)(stat.clone());
+ }
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
+ }
+ scanner = null;
+ plan = null;
+ qual = null;
+ projector = null;
+ }
+
+ public String getTableName() {
+ return plan.getTableName();
+ }
+
+ @Override
+ public float getProgress() {
+ if (scanner == null) {
+ return 1.0f;
+ } else {
+ return scanner.getProgress();
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (scanner != null) {
+ return scanner.getInputStats();
+ } else {
+ return inputStats;
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (scanner != null) {
+ return "SeqScanExec:" + plan.getTableName() + "," + scanner.getClass().getName();
+ } else {
+ return "SeqScanExec:" + plan.getTableName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
new file mode 100644
index 0000000..629889d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * This is the sort-based aggregation operator.
+ *
+ * <h3>Implementation</h3>
+ * Sort Aggregation has two states while running.
+ *
+ * <h4>Aggregate state</h4>
+ * If lastkey is null or lastkey is equivalent to the current key, sort aggregation is changed to this state.
+ * In this state, this operator aggregates measure values via aggregation functions.
+ *
+ * <h4>Finalize state</h4>
+ * If currentKey is different from the last key, it computes final aggregation results, and then
+ * it makes an output tuple.
+ */
+public class SortAggregateExec extends AggregationExec {
+ private Tuple lastKey = null;
+ private boolean finished = false;
+ private FunctionContext contexts[];
+
+ public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
+ super(context, plan, child);
+ contexts = new FunctionContext[plan.getAggFunctions().length];
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple currentKey;
+ Tuple tuple;
+ Tuple outputTuple = null;
+
+ while(!context.isStopped() && (tuple = child.next()) != null) {
+
+ // get a key tuple
+ currentKey = new VTuple(groupingKeyIds.length);
+ for(int i = 0; i < groupingKeyIds.length; i++) {
+ currentKey.put(i, tuple.get(groupingKeyIds[i]));
+ }
+
+ /** Aggregation State */
+ if (lastKey == null || lastKey.equals(currentKey)) {
+ if (lastKey == null) {
+ for(int i = 0; i < aggFunctionsNum; i++) {
+ contexts[i] = aggFunctions[i].newContext();
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ lastKey = currentKey;
+ } else {
+ // aggregate
+ for (int i = 0; i < aggFunctionsNum; i++) {
+ aggFunctions[i].merge(contexts[i], inSchema, tuple);
+ }
+ }
+
+ } else { /** Finalization State */
+ // finalize aggregate and return
+ outputTuple = new VTuple(outSchema.size());
+ int tupleIdx = 0;
+
+ for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+ outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+ }
+ for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+ outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
+ }
+
+ for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
+ contexts[evalIdx] = aggFunctions[evalIdx].newContext();
+ aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple);
+ }
+
+ lastKey = currentKey;
+ return outputTuple;
+ }
+ } // while loop
+
+ if (!finished) {
+ outputTuple = new VTuple(outSchema.size());
+ int tupleIdx = 0;
+ for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+ outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+ }
+ for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+ outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
+ }
+ finished = true;
+ }
+ return outputTuple;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+
+ lastKey = null;
+ finished = false;
+ }
+}