You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/01/23 02:26:12 UTC
tajo git commit: TAJO-1309: Add missing break point in physical
operator. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 0024c75e9 -> 17c6dff4e
TAJO-1309: Add missing break point in physical operator. (jinho)
Closes #355
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/17c6dff4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/17c6dff4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/17c6dff4
Branch: refs/heads/master
Commit: 17c6dff4e258d93e0ffaa1cc07368b2b5d8b8aa4
Parents: 0024c75
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 23 10:24:59 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 23 10:24:59 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 ++
.../tajo/cli/tsql/commands/HelpCommand.java | 6 +++---
.../engine/planner/physical/BNLJoinExec.java | 3 ++-
.../planner/physical/ExternalSortExec.java | 2 +-
.../planner/physical/HashAggregateExec.java | 2 +-
.../HashBasedColPartitionStoreExec.java | 2 +-
.../planner/physical/HashFullOuterJoinExec.java | 4 ++--
.../engine/planner/physical/HashJoinExec.java | 4 ++--
.../planner/physical/HashLeftAntiJoinExec.java | 4 ++--
.../planner/physical/HashLeftOuterJoinExec.java | 4 ++--
.../planner/physical/HashLeftSemiJoinExec.java | 2 +-
.../physical/HashShuffleFileWriteExec.java | 2 +-
.../engine/planner/physical/HavingExec.java | 2 +-
.../engine/planner/physical/MemSortExec.java | 2 +-
.../physical/MergeFullOuterJoinExec.java | 3 ++-
.../engine/planner/physical/MergeJoinExec.java | 3 ++-
.../engine/planner/physical/NLJoinExec.java | 3 ++-
.../planner/physical/NLLeftOuterJoinExec.java | 3 ++-
.../physical/PartitionMergeScanExec.java | 5 ++---
.../physical/RangeShuffleFileWriteExec.java | 2 +-
.../physical/RightOuterMergeJoinExec.java | 3 ++-
.../engine/planner/physical/SelectionExec.java | 2 +-
.../engine/planner/physical/SeqScanExec.java | 2 +-
.../SortBasedColPartitionStoreExec.java | 2 +-
.../engine/planner/physical/StoreTableExec.java | 2 +-
.../org/apache/tajo/master/QueryInProgress.java | 10 ++++++---
.../apache/tajo/master/TajoContainerProxy.java | 10 +++++----
.../tajo/worker/TajoResourceAllocator.java | 6 +++---
.../main/java/org/apache/tajo/worker/Task.java | 22 ++++++++------------
.../apache/tajo/worker/TaskAttemptContext.java | 2 +-
30 files changed, 65 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6ba73fe..4e14c93 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.10.0 - unreleased
IMPROVEMENT
+ TAJO-1309: Add missing break point in physical operator. (jinho)
+
TAJO-1307: HBaseStorageManager need to support for users to use
hbase-site.xml file. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
index 5d41e41..ce56d12 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
@@ -18,11 +18,11 @@
package org.apache.tajo.cli.tsql.commands;
-import java.io.PrintWriter;
-
import org.apache.tajo.cli.tsql.TajoCli;
import org.apache.tajo.util.VersionInfo;
+import java.io.PrintWriter;
+
public class HelpCommand extends TajoShellCommand {
private String targetDocVersion = "";
@@ -79,7 +79,7 @@ public class HelpCommand extends TajoShellCommand {
sout.println();
sout.println("Variables");
- sout.println(" \\set [[NAME] [VALUE] set session variable or list session variables");
+ sout.println(" \\set [NAME] [VALUE] set session variable or list session variables");
sout.println(" \\unset NAME unset session variable");
sout.println();
sout.println();
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 117b04c..14cf567 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -128,7 +128,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
rightEnd = true;
}
- while (true) {
+ while (!context.isStopped()) {
if (!rightIterator.hasNext()) { // if leftIterator ended
if (leftIterator.hasNext()) { // if rightTupleslot remains
leftTuple = leftIterator.next();
@@ -201,6 +201,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
return outputTuple;
}
}
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 4e19114..c3f9d3d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -203,7 +203,7 @@ public class ExternalSortExec extends SortExec {
int chunkId = 0;
long runStartTime = System.currentTimeMillis();
- while ((tuple = child.next()) != null) { // partition sort start
+ while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start
Tuple vtuple = new VTuple(tuple);
inMemoryTable.add(vtuple);
memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 80bba2b..0d1bf3d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -48,7 +48,7 @@ public class HashAggregateExec extends AggregationExec {
private void compute() throws IOException {
Tuple tuple;
Tuple keyTuple;
- while((tuple = child.next()) != null && !context.isStopped()) {
+ while(!context.isStopped() && (tuple = child.next()) != null) {
keyTuple = new VTuple(groupingKeyIds.length);
// build one key tuple
for(int i = 0; i < groupingKeyIds.length; i++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index c28a5cd..e94bc26 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -67,7 +67,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
public Tuple next() throws IOException {
Tuple tuple;
StringBuilder sb = new StringBuilder();
- while((tuple = child.next()) != null) {
+ while(!context.isStopped() && (tuple = child.next()) != null) {
// set subpartition directory name
sb.delete(0, sb.length());
if (keyIds != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 28d9a3e..9cd13fb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -144,7 +144,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
Tuple rightTuple;
boolean found = false;
- while(!finished) {
+ while(!context.isStopped() && !finished) {
if (shouldGetLeftTuple) { // initially, it is true.
// getting new outer
leftTuple = leftChild.next(); // it comes from a disk
@@ -208,7 +208,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
Tuple tuple;
Tuple keyTuple;
- while ((tuple = rightChild.next()) != null) {
+ while (!context.isStopped() && (tuple = rightChild.next()) != null) {
keyTuple = new VTuple(joinKeyPairs.size());
for (int i = 0; i < rightKeyList.length; i++) {
keyTuple.put(i, tuple.get(rightKeyList[i]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 701297f..38728b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -111,7 +111,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
Tuple rightTuple;
boolean found = false;
- while(!finished) {
+ while(!context.isStopped() && !finished) {
if (shouldGetLeftTuple) { // initially, it is true.
// getting new outer
leftTuple = leftChild.next(); // it comes from a disk
@@ -156,7 +156,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
Tuple tuple;
Tuple keyTuple;
- while ((tuple = rightChild.next()) != null) {
+ while (!context.isStopped() && (tuple = rightChild.next()) != null) {
keyTuple = new VTuple(joinKeyPairs.size());
for (int i = 0; i < rightKeyList.length; i++) {
keyTuple.put(i, tuple.get(rightKeyList[i]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 236f5e3..cceed3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -64,7 +64,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
Tuple rightTuple;
boolean notFound;
- while(!finished) {
+ while(!context.isStopped() && !finished) {
// getting new outer
leftTuple = leftChild.next(); // it comes from a disk
@@ -89,7 +89,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
// Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
// If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
notFound = true;
- while (notFound && iterator.hasNext()) {
+ while (!context.isStopped() && notFound && iterator.hasNext()) {
rightTuple = iterator.next();
frameTuple.set(leftTuple, rightTuple);
if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index c1b6522..233ef92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -138,7 +138,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
Tuple rightTuple;
boolean found = false;
- while(!finished) {
+ while(!context.isStopped() && !finished) {
if (shouldGetLeftTuple) { // initially, it is true.
// getting new outer
@@ -204,7 +204,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
Tuple tuple;
Tuple keyTuple;
- while ((tuple = rightChild.next()) != null) {
+ while (!context.isStopped() && (tuple = rightChild.next()) != null) {
keyTuple = new VTuple(joinKeyPairs.size());
for (int i = 0; i < rightKeyList.length; i++) {
keyTuple.put(i, tuple.get(rightKeyList[i]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 5196a63..37c6d0e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -70,7 +70,7 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
Tuple rightTuple;
boolean notFound;
- while(!finished) {
+ while(!context.isStopped() && !finished) {
// getting new outer
leftTuple = leftChild.next(); // it comes from a disk
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 3c4949f..28974f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -103,7 +103,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
int partId;
int tupleCount = 0;
long numRows = 0;
- while ((tuple = child.next()) != null) {
+ while (!context.isStopped() && (tuple = child.next()) != null) {
tupleCount++;
numRows++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index f9f4351..e9a7c03 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -39,7 +39,7 @@ public class HavingExec extends UnaryPhysicalExec {
@Override
public Tuple next() throws IOException {
Tuple tuple;
- while ((tuple = child.next()) != null) {
+ while (!context.isStopped() && (tuple = child.next()) != null) {
if (qual.eval(inSchema, tuple).isTrue()) {
return tuple;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 13fec7b..c77313e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -51,7 +51,7 @@ public class MemSortExec extends SortExec {
if (!sorted) {
Tuple tuple;
- while ((tuple = child.next()) != null) {
+ while (!context.isStopped() && (tuple = child.next()) != null) {
tupleSlots.add(new VTuple(tuple));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index cb2552b..3f2e431 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -109,7 +109,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
public Tuple next() throws IOException {
Tuple previous;
- for (;;) {
+ while (!context.isStopped()) {
boolean newRound = false;
if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
newRound = true;
@@ -313,6 +313,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
}
} // the second if end false
} // for
+ return null;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 13104ee..63f48ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -102,7 +102,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
public Tuple next() throws IOException {
Tuple previous;
- for (;;) {
+ while (!context.isStopped()) {
if (!outerIterator.hasNext() && !innerIterator.hasNext()) {
if(end){
return null;
@@ -170,6 +170,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
return outTuple;
}
}
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index b5c6244..5e7ab98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -67,7 +67,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
}
public Tuple next() throws IOException {
- for (;;) {
+ while (!context.isStopped()) {
if (needNewOuter) {
outerTuple = leftChild.next();
if (outerTuple == null) {
@@ -94,6 +94,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
return outTuple;
}
}
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 8ff7570..7959d47 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -73,7 +73,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
}
public Tuple next() throws IOException {
- for (;;) {
+ while (!context.isStopped()) {
if (needNextRightTuple) {
leftTuple = leftChild.next();
if (leftTuple == null) {
@@ -112,6 +112,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
return outTuple;
}
}
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 5297e2c..5692308 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -21,9 +21,8 @@ package org.apache.tajo.engine.planner.physical;
import com.google.common.collect.Lists;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -70,7 +69,7 @@ public class PartitionMergeScanExec extends PhysicalExec {
@Override
public Tuple next() throws IOException {
Tuple tuple;
- while (currentScanner != null) {
+ while (!context.isStopped() && currentScanner != null) {
tuple = currentScanner.next();
if (tuple != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 119f053..8da1a03 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -96,7 +96,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
long offset;
- while((tuple = child.next()) != null) {
+ while(!context.isStopped() && (tuple = child.next()) != null) {
offset = appender.getOffset();
appender.addTuple(tuple);
keyTuple = new VTuple(keySchema.size());
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index a02d00b..5e80b8f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -129,7 +129,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
public Tuple next() throws IOException {
Tuple previous;
- for (;;) {
+ while (!context.isStopped()) {
boolean newRound = false;
if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
newRound = true;
@@ -339,6 +339,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
}
} // the second if end false
} // for
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 9e84462..b9273fa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -44,7 +44,7 @@ public class SelectionExec extends UnaryPhysicalExec {
@Override
public Tuple next() throws IOException {
Tuple tuple;
- while ((tuple = child.next()) != null) {
+ while (!context.isStopped() && (tuple = child.next()) != null) {
if (qual.eval(inSchema, tuple).isTrue()) {
return tuple;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 94cd4ed..15f17fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -251,7 +251,7 @@ public class SeqScanExec extends PhysicalExec {
initScanner(projected);
List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>();
- while (true) {
+ while (!context.isStopped()) {
Tuple tuple = next();
if (tuple != null) {
broadcastTupleCacheList.add(tuple);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index f7c20fc..ca90b0e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -73,7 +73,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
@Override
public Tuple next() throws IOException {
Tuple tuple;
- while((tuple = child.next()) != null) {
+ while(!context.isStopped() && (tuple = child.next()) != null) {
fillKeyTuple(tuple, currentKey);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 3d3da5c..5622699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -115,7 +115,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
*/
@Override
public Tuple next() throws IOException {
- while((tuple = child.next()) != null) {
+ while(!context.isStopped() && (tuple = child.next()) != null) {
appender.addTuple(tuple);
if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 352ec46..df461c8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -80,8 +80,12 @@ public class QueryInProgress {
public synchronized void kill() {
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
- if(queryMasterRpcClient != null){
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ if (queryMasterRpcClient != null) {
+ try {
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ } catch (Throwable e) {
+ catchException(e);
+ }
}
}
@@ -165,7 +169,7 @@ public class QueryInProgress {
}
}
- public void catchException(Exception e) {
+ public void catchException(Throwable e) {
LOG.error(e.getMessage(), e);
queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
queryInfo.setLastMessage(StringUtils.stringifyException(e));
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 588b7ee..42ffd87 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -31,6 +31,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.container.TajoContainer;
import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.TaskFatalErrorEvent;
import org.apache.tajo.master.rm.TajoWorkerContainer;
import org.apache.tajo.master.rm.TajoWorkerContainerId;
import org.apache.tajo.querymaster.QueryMasterTask;
@@ -82,8 +83,9 @@ public class TajoContainerProxy extends ContainerProxy {
tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ } catch (Throwable e) {
+ /* Worker RPC failure */
+ context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage()));
} finally {
RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
}
@@ -111,7 +113,7 @@ public class TajoContainerProxy extends ContainerProxy {
.build();
tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get());
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
@@ -198,7 +200,7 @@ public class TajoContainerProxy extends ContainerProxy {
.addAllContainerIds(containerIdProtos)
.build(),
NullCallback.get());
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 7278317..dd408c9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -128,7 +128,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
for(ContainerProxy eachProxy: list) {
try {
eachProxy.stopContainer();
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.warn(e.getMessage());
}
}
@@ -301,7 +301,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
@@ -363,7 +363,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
containerIds.add(eachContainer.getId());
}
TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
return;
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 5f9c6ac..e9ad838 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -88,8 +88,6 @@ public class Task {
private final Map<String, TableDesc> descs = Maps.newHashMap();
private PhysicalExec executor;
private boolean interQuery;
- private boolean killed = false;
- private boolean aborted = false;
private Path inputTableBaseDir;
private long startTime;
@@ -254,13 +252,11 @@ public class Task {
}
public void kill() {
- killed = true;
- context.stop();
context.setState(TaskAttemptState.TA_KILLED);
+ context.stop();
}
public void abort() {
- aborted = true;
context.stop();
}
@@ -299,7 +295,7 @@ public class Task {
}
public void updateProgress() {
- if(killed || aborted){
+ if(context != null && context.isStopped()){
return;
}
@@ -403,12 +399,12 @@ public class Task {
createPlan(context, plan);
this.executor.init();
- while(!killed && !aborted && executor.next() != null) {
+ while(!context.isStopped() && executor.next() != null) {
}
} catch (Throwable e) {
error = e ;
LOG.error(e.getMessage(), e);
- aborted = true;
+ context.stop();
} finally {
if (executor != null) {
try {
@@ -423,10 +419,10 @@ public class Task {
executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
- if (killed || aborted) {
+ if (context.isStopped()) {
context.setExecutorProgress(0.0f);
- if(killed) {
- context.setState(TaskAttemptState.TA_KILLED);
+
+ if(context.getState() == TaskAttemptState.TA_KILLED) {
queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
executionBlockContext.killedTasksNum.incrementAndGet();
} else {
@@ -593,7 +589,7 @@ public class Task {
int retryWaitTime = 1000; //sec
try { // for releasing fetch latch
- while(!killed && retryNum < maxRetryNum) {
+ while(!context.isStopped() && retryNum < maxRetryNum) {
if (retryNum > 0) {
try {
Thread.sleep(retryWaitTime);
@@ -625,7 +621,7 @@ public class Task {
if (retryNum == maxRetryNum) {
LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
}
- aborted = true; // retry task
+ context.stop(); // retry task
ctx.getFetchLatch().countDown();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 3092c47..1f2c325 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -70,7 +70,7 @@ public class TaskAttemptContext {
/** a map of shuffled file outputs */
private Map<Integer, String> shuffleFileOutputs;
private File fetchIn;
- private boolean stopped = false;
+ private volatile boolean stopped = false;
private boolean interQuery = false;
private Path outputPath;
private DataChannel dataChannel;