You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/16 11:28:26 UTC
[2/5] TAJO-255: Cleanup exceptions of engine. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java
deleted file mode 100644
index 5673c5f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query.exception;
-
-public class InvalidQueryException extends RuntimeException {
- private static final long serialVersionUID = -7085849718839416246L;
-
- public InvalidQueryException() {
- super();
- }
-
- public InvalidQueryException(String message) {
- super(message);
- }
-
- public InvalidQueryException(Throwable t) {
- super(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java
deleted file mode 100644
index c60aa1f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.engine.query.exception;
-
-
-public class NotSupportQueryException extends InvalidQueryException {
- private static final long serialVersionUID = 4079784008765680410L;
-
- /**
- * @param query
- */
- public NotSupportQueryException(String query) {
- super("Unsupported query: "+query);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java
deleted file mode 100644
index 625981b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query.exception;
-
-
-import org.antlr.v4.runtime.Token;
-import org.apache.commons.lang.StringUtils;
-
-public class SQLParseError extends RuntimeException {
- private String header;
- private String errorLine;
- private int charPositionInLine;
- private int line;
- private Token offendingToken;
- private String detailedMessage;
-
- public SQLParseError(Token offendingToken,
- int line, int charPositionInLine,
- String msg,
- String errorLine) {
- super(msg);
- this.offendingToken = offendingToken;
- this.charPositionInLine = charPositionInLine;
- this.line = line;
- this.errorLine = errorLine;
- this.header = msg;
- }
-
- @Override
- public String getMessage() {
- if (detailedMessage == null) {
- if (offendingToken != null) {
- detailedMessage = getDetailedMessageWithLocation();
- } else {
- StringBuilder sb = new StringBuilder();
- sb.append("ERROR: ").append(header).append("\n");
- sb.append("LINE: ").append(errorLine);
- detailedMessage = sb.toString();
- }
- }
-
- return detailedMessage;
- }
-
- public String getMessageHeader(){
- return this.header;
- }
-
- private String getDetailedMessageWithLocation() {
- StringBuilder sb = new StringBuilder();
- int displayLimit = 80;
- String queryPrefix = "LINE " + line + ":" + charPositionInLine + " ";
- String prefixPadding = StringUtils.repeat(" ", queryPrefix.length());
- String locationString;
-
- int tokenLength = offendingToken.getStopIndex() - offendingToken.getStartIndex() + 1;
- if(tokenLength > 0){
- locationString = StringUtils.repeat(" ", charPositionInLine) + StringUtils.repeat("^", tokenLength);
- } else {
- locationString = StringUtils.repeat(" ", charPositionInLine) + "^";
- }
-
- sb.append("ERROR: ").append(header).append("\n");
- sb.append(queryPrefix);
-
- if (errorLine.length() > displayLimit) {
- int padding = (displayLimit / 2);
-
- String ellipsis = " ... ";
- int startPos = locationString.length() - padding - 1;
- if (startPos <= 0) {
- startPos = 0;
- sb.append(errorLine.substring(startPos, displayLimit)).append(ellipsis).append("\n");
- sb.append(prefixPadding).append(locationString);
- } else if (errorLine.length() - (locationString.length() + padding) <= 0) {
- startPos = errorLine.length() - displayLimit - 1;
- sb.append(ellipsis).append(errorLine.substring(startPos)).append("\n");
- sb.append(prefixPadding).append(StringUtils.repeat(" ", ellipsis.length()))
- .append(locationString.substring(startPos));
- } else {
- sb.append(ellipsis).append(errorLine.substring(startPos, startPos + displayLimit)).append(ellipsis).append("\n");
- sb.append(prefixPadding).append(StringUtils.repeat(" ", ellipsis.length()))
- .append(locationString.substring(startPos));
- }
- } else {
- sb.append(errorLine).append("\n");
- sb.append(prefixPadding).append(locationString);
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java
deleted file mode 100644
index 619b11d..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query.exception;
-
-
-public class SQLSyntaxError extends InvalidQueryException {
- private static final long serialVersionUID = 5388279335175632066L;
-
- private String errorMessage;
- private String detailedMessage;
- private SQLParseError parseError;
-
- public SQLSyntaxError(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- public SQLSyntaxError(SQLParseError e) {
- this.errorMessage = e.getMessageHeader();
- this.parseError = e;
- }
-
- @Override
- public String getMessage() {
- if (detailedMessage == null) {
- if (parseError != null) {
- detailedMessage = parseError.getMessage();
- } else {
- detailedMessage = String.format("ERROR: %s\n", errorMessage);
- }
- }
- return detailedMessage;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java
deleted file mode 100644
index c7f4e98..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.engine.query.exception;
-
-
-public class UndefinedFunctionException extends InvalidQueryException {
- private static final long serialVersionUID = 113593927391549716L;
-
- /**
- * @param signature
- */
- public UndefinedFunctionException(String signature) {
- super("Error: call to undefined function "+signature+"()");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
deleted file mode 100644
index 971f13a..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.ipc.protocolrecords;
-
-import org.apache.tajo.DataChannel;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryContext;
-import org.apache.tajo.storage.Fragment;
-
-import java.net.URI;
-import java.util.List;
-
-public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
-
- public QueryUnitAttemptId getId();
- public List<Fragment> getFragments();
- public String getOutputTableId();
- public boolean isClusteredOutput();
- public String getSerializedData();
- public boolean isInterQuery();
- public void setInterQuery();
- public void addFetch(String name, URI uri);
- public List<TajoWorkerProtocol.Fetch> getFetches();
- public boolean shouldDie();
- public void setShouldDie();
- public QueryContext getQueryContext();
- public DataChannel getDataChannel();
- public Enforcer getEnforcer();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
deleted file mode 100644
index 3c03e50..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.*;
-
-import java.util.*;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-
-/**
- * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
- * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
- * An ExecutionBlock class contains input information (e.g., child execution blocks or input
- * tables), and output information (e.g., partition type, partition key, and partition number).
- * In addition, it includes a logical plan to be executed in each node.
- */
-public class ExecutionBlock {
- private ExecutionBlockId executionBlockId;
- private LogicalNode plan = null;
- private StoreTableNode store = null;
- private List<ScanNode> scanlist = new ArrayList<ScanNode>();
- private ExecutionBlock parent;
- private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
- private PartitionType outputType;
- private Enforcer enforcer = new Enforcer();
-
- private boolean hasJoinPlan;
- private boolean hasUnionPlan;
-
- private Set<String> broadcasted = new HashSet<String>();
-
- public ExecutionBlock(ExecutionBlockId executionBlockId) {
- this.executionBlockId = executionBlockId;
- }
-
- public ExecutionBlockId getId() {
- return executionBlockId;
- }
-
- public PartitionType getPartitionType() {
- return outputType;
- }
-
- public void setPlan(LogicalNode plan) {
- hasJoinPlan = false;
- hasUnionPlan = false;
- this.scanlist.clear();
- this.plan = plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- if (binary.getType() == NodeType.JOIN) {
- hasJoinPlan = true;
- } else if (binary.getType() == NodeType.UNION) {
- hasUnionPlan = true;
- }
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scanlist.add((ScanNode)node);
- } else if (node instanceof TableSubQueryNode) {
- TableSubQueryNode subQuery = (TableSubQueryNode) node;
- s.add(s.size(), subQuery.getSubQuery());
- }
- }
- }
-
-
- public LogicalNode getPlan() {
- return plan;
- }
-
- public Enforcer getEnforcer() {
- return enforcer;
- }
-
- public boolean isRoot() {
- return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
- }
-
- public boolean hasParentBlock() {
- return parent != null;
- }
-
- public ExecutionBlock getParentBlock() {
- return parent;
- }
-
- public Collection<ExecutionBlock> getChildBlocks() {
- return Collections.unmodifiableCollection(childSubQueries.values());
- }
-
- public boolean isLeafBlock() {
- return childSubQueries.size() == 0;
- }
-
- public StoreTableNode getStoreTableNode() {
- return store;
- }
-
- public ScanNode [] getScanNodes() {
- return this.scanlist.toArray(new ScanNode[scanlist.size()]);
- }
-
- public Schema getOutputSchema() {
- return store.getOutSchema();
- }
-
- public boolean hasJoin() {
- return hasJoinPlan;
- }
-
- public boolean hasUnion() {
- return hasUnionPlan;
- }
-
- public void addBroadcastTables(Collection<String> tableNames) {
- broadcasted.addAll(tableNames);
- }
-
- public void addBroadcastTable(String tableName) {
- broadcasted.add(tableName);
- }
-
- public boolean isBroadcastTable(String tableName) {
- return broadcasted.contains(tableName);
- }
-
- public Collection<String> getBroadcastTables() {
- return broadcasted;
- }
-
- public String toString() {
- return executionBlockId.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
deleted file mode 100644
index 51c825c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.engine.planner.global.MasterPlan;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-
-/**
- * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
- * This class is a pointer to an ExecutionBlock that the query engine should execute.
- * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
- */
-public class ExecutionBlockCursor {
- private MasterPlan masterPlan;
- private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
- private int cursor = 0;
-
- public ExecutionBlockCursor(MasterPlan plan) {
- this.masterPlan = plan;
- buildOrder(plan.getRoot());
- }
-
- public int size() {
- return orderedBlocks.size();
- }
-
- private void buildOrder(ExecutionBlock current) {
- if (!masterPlan.isLeaf(current.getId())) {
- if (masterPlan.getChildCount(current.getId()) == 1) {
- ExecutionBlock block = masterPlan.getChild(current, 0);
- buildOrder(block);
- } else {
- for (ExecutionBlock exec : masterPlan.getChilds(current)) {
- buildOrder(exec);
- }
- }
- }
- orderedBlocks.add(current);
- }
-
- public boolean hasNext() {
- return cursor < orderedBlocks.size();
- }
-
- public ExecutionBlock nextBlock() {
- return orderedBlocks.get(cursor++);
- }
-
- public ExecutionBlock peek() {
- return orderedBlocks.get(cursor);
- }
-
- public ExecutionBlock peek(int skip) {
- return orderedBlocks.get(cursor + skip);
- }
-
- public void reset() {
- cursor = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8c4aeed..4c12981 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -38,14 +38,12 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.eval.ConstEval;
import org.apache.tajo.engine.eval.FieldEval;
-import org.apache.tajo.engine.exception.EmptyClusterException;
import org.apache.tajo.engine.exception.IllegalQueryStatusException;
-import org.apache.tajo.engine.exception.NoSuchQueryIdException;
-import org.apache.tajo.engine.exception.UnknownWorkerException;
import org.apache.tajo.engine.parser.HiveConverter;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInfo;
@@ -103,9 +101,7 @@ public class GlobalEngine extends AbstractService {
}
public GetQueryStatusResponse executeQuery(String sql)
- throws InterruptedException, IOException,
- NoSuchQueryIdException, IllegalQueryStatusException,
- UnknownWorkerException, EmptyClusterException {
+ throws InterruptedException, IOException, IllegalQueryStatusException {
LOG.info("SQL: " + sql);
QueryContext queryContext = new QueryContext();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
deleted file mode 100644
index 6e4ab25..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.DataChannel;
-import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.storage.AbstractStorageManager;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
-
-public class GlobalPlanner {
- private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
-
- private TajoConf conf;
- private AbstractStorageManager sm;
-
- public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
- throws IOException {
- this.conf = conf;
- this.sm = sm;
- }
-
- public class GlobalPlanContext {
- MasterPlan plan;
- Set<String> broadcastTables = new HashSet<String>();
- LogicalNode topmost;
- LogicalNode lastRepartionableNode;
- ExecutionBlock topMostLeftExecBlock;
- }
-
- /**
- * Builds a master plan from the given logical plan.
- */
- public void build(MasterPlan masterPlan)
- throws IOException, PlanningException {
-
- DistributedPlannerVisitor planner = new DistributedPlannerVisitor();
- GlobalPlanContext globalPlanContext = new GlobalPlanContext();
- globalPlanContext.plan = masterPlan;
- LOG.info(masterPlan.getLogicalPlan());
-
- LogicalNode rootNode = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
- planner.visitChild(globalPlanContext, masterPlan.getLogicalPlan(), rootNode, new Stack<LogicalNode>());
-
- ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
-
- if (globalPlanContext.lastRepartionableNode != null
- && globalPlanContext.lastRepartionableNode.getType() == NodeType.UNION) {
- UnionNode unionNode = (UnionNode) globalPlanContext.lastRepartionableNode;
- ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
- UnionsFinderContext finderContext = new UnionsFinderContext();
- finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
-
- for (UnionNode union : finderContext.unionList) {
- TableSubQueryNode leftSubQuery = union.getLeftChild();
- TableSubQueryNode rightSubQuery = union.getRightChild();
- if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- execBlock.setPlan(leftSubQuery);
- DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
- masterPlan.addConnect(dataChannel);
- }
- if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- execBlock.setPlan(rightSubQuery);
- DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
- masterPlan.addConnect(dataChannel);
- }
- }
- } else {
- DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
- dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
- masterPlan.addConnect(dataChannel);
- }
- masterPlan.setTerminal(terminalBlock);
- LOG.info(masterPlan);
- }
-
- private ExecutionBlock buildRepartitionBlocks(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
- LogicalNode childNode, ExecutionBlock lastChildBlock)
- throws PlanningException {
-
- ExecutionBlock currentBlock = null;
- ExecutionBlock childBlock;
- childBlock = lastChildBlock;
-
- NodeType shuffleRequiredNodeType = lastDistNode.getType();
- if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
- ExecutionBlock [] blocks = buildGroupBy(masterPlan, lastDistNode, curNode, childNode, childBlock);
- currentBlock = blocks[0];
- } else if (shuffleRequiredNodeType == NodeType.SORT) {
- ExecutionBlock [] blocks = buildSortPlan(masterPlan, lastDistNode, curNode, childNode, childBlock);
- currentBlock = blocks[0];
- } else if (shuffleRequiredNodeType == NodeType.JOIN) {
- ExecutionBlock [] blocks = buildJoinPlan(masterPlan, lastDistNode, childBlock, lastChildBlock);
- currentBlock = blocks[0];
- }
-
- return currentBlock;
- }
-
- public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
- Preconditions.checkArgument(channel.getSchema() != null,
- "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
- TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
- TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
- return new ScanNode(plan.newPID(), desc);
- }
-
- private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
- ExecutionBlock parent, JoinNode join, boolean leftTable) {
- ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
-
- DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
- if (join.getJoinType() != JoinType.CROSS) {
- Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
- leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
- if (leftTable) {
- channel.setPartitionKey(joinColumns[0]);
- } else {
- channel.setPartitionKey(joinColumns[1]);
- }
- }
- return channel;
- }
-
- private ExecutionBlock [] buildJoinPlan(MasterPlan masterPlan, LogicalNode lastDistNode,
- ExecutionBlock childBlock, ExecutionBlock lastChildBlock)
- throws PlanningException {
- ExecutionBlock currentBlock;
-
- JoinNode joinNode = (JoinNode) lastDistNode;
- LogicalNode leftNode = joinNode.getLeftChild();
- LogicalNode rightNode = joinNode.getRightChild();
-
- boolean leftBroadcasted = false;
- boolean rightBroadcasted = false;
-
- if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
- ScanNode leftScan = (ScanNode) leftNode;
- ScanNode rightScan = (ScanNode) rightNode;
-
- TableMeta leftMeta = leftScan.getTableDesc().getMeta();
- TableMeta rightMeta = rightScan.getTableDesc().getMeta();
- long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
-
- if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
- leftBroadcasted = true;
- }
- if (rightMeta.getStat().getNumBytes() < broadcastThreshold) {
- rightBroadcasted = true;
- }
-
- if (leftBroadcasted || rightBroadcasted) {
- currentBlock = masterPlan.newExecutionBlock();
- currentBlock.setPlan(joinNode);
- if (leftBroadcasted) {
- currentBlock.addBroadcastTable(leftScan.getCanonicalName());
- }
- if (rightBroadcasted) {
- currentBlock.addBroadcastTable(rightScan.getCanonicalName());
- }
- return new ExecutionBlock[] { currentBlock, childBlock };
- }
- }
-
- // symmetric repartition join
-
- ExecutionBlock leftBlock;
- if (lastChildBlock == null) {
- leftBlock = masterPlan.newExecutionBlock();
- leftBlock.setPlan(leftNode);
- } else {
- leftBlock = lastChildBlock;
- }
- ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
- rightBlock.setPlan(rightNode);
-
- currentBlock = masterPlan.newExecutionBlock();
-
- DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
- DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
-
- ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
- ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
-
- joinNode.setLeftChild(leftScan);
- joinNode.setRightChild(rightScan);
- currentBlock.setPlan(joinNode);
-
- masterPlan.addConnect(leftChannel);
- masterPlan.addConnect(rightChannel);
-
- return new ExecutionBlock[] { currentBlock, childBlock };
-
- }
-
- private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
- LogicalNode childNode, ExecutionBlock childBlock) throws PlanningException {
- ExecutionBlock currentBlock = null;
- GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
-
- if (groupByNode.isDistinct()) {
- if (childBlock == null) { // first repartition node
- childBlock = masterPlan.newExecutionBlock();
- }
- childBlock.setPlan(groupByNode.getChild());
- currentBlock = masterPlan.newExecutionBlock();
-
- LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
-
- for (Target target : groupByNode.getTargets()) {
- List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
- for (AggregationFunctionCallEval function : functions) {
- if (function.isDistinct()) {
- columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
- }
- }
- }
-
- Set<Column> existingColumns = Sets.newHashSet(groupByNode.getGroupingColumns());
- columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
- SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
- currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
-
- DataChannel channel;
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(groupByNode.getGroupingColumns());
- channel.setSchema(groupByNode.getInSchema());
-
- GroupbyNode secondGroupBy = groupByNode;
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- secondGroupBy.setChild(scanNode);
-
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
-
- masterPlan.addConnect(channel);
- currentBlock.setPlan(currentNode);
-
- } else {
-
- GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
- firstGroupBy.setHavingCondition(null);
-
- if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
- ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
-
- UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
- ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
- UnionsFinderContext finderContext = new UnionsFinderContext();
- finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
-
- currentBlock = masterPlan.newExecutionBlock();
- GroupbyNode secondGroupBy = groupByNode;
- for (UnionNode union : finderContext.unionList) {
- TableSubQueryNode leftSubQuery = union.getLeftChild();
- TableSubQueryNode rightSubQuery = union.getRightChild();
- DataChannel dataChannel;
- if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
- g1.setChild(leftSubQuery);
- execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- secondGroupBy.setChild(scanNode);
- masterPlan.addConnect(dataChannel);
- }
- if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
- g1.setChild(rightSubQuery);
- execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- secondGroupBy.setChild(scanNode);
- masterPlan.addConnect(dataChannel);
- }
- }
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
- currentBlock.setPlan(currentNode);
- } else {
-
- if (childBlock == null) { // first repartition node
- childBlock = masterPlan.newExecutionBlock();
- }
- childBlock.setPlan(firstGroupBy);
-
- currentBlock = masterPlan.newExecutionBlock();
-
- DataChannel channel;
- if (firstGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
- channel.setPartitionKey(firstGroupBy.getGroupingColumns());
- } else {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(firstGroupBy.getGroupingColumns());
- }
- channel.setSchema(firstGroupBy.getOutSchema());
-
- GroupbyNode secondGroupBy = groupByNode;
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- secondGroupBy.setChild(scanNode);
-
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
-
- masterPlan.addConnect(channel);
- currentBlock.setPlan(currentNode);
- }
- }
-
- return new ExecutionBlock [] {currentBlock, childBlock};
- }
-
- private ExecutionBlock [] buildSortPlan(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
- LogicalNode childNode, ExecutionBlock childBlock) {
- ExecutionBlock currentBlock = null;
-
- SortNode firstSort = (SortNode) lastDistNode;
- if (childBlock == null) {
- childBlock = masterPlan.newExecutionBlock();
- }
- childBlock.setPlan(firstSort);
-
- currentBlock = masterPlan.newExecutionBlock();
- DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
- channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
- channel.setSchema(childNode.getOutSchema());
-
- SortNode secondSort = PlannerUtil.clone(lastDistNode);
- ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- secondSort.setChild(secondScan);
-
- LimitNode limitAndSort;
- LimitNode limitOrNull = PlannerUtil.findTopNode(currentNode, NodeType.LIMIT);
- if (limitOrNull != null) {
- limitAndSort = PlannerUtil.clone(limitOrNull);
- limitAndSort.setChild(firstSort);
-
- if (childBlock.getPlan().getType() == NodeType.SORT) {
- childBlock.setPlan(limitAndSort);
- } else {
- LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
- if (sortParent != null) {
- if (sortParent instanceof UnaryNode) {
- ((UnaryNode)sortParent).setChild(limitAndSort);
- }
- }
- }
- }
-
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondSort) {
- ((UnaryNode)parent).setChild(secondSort);
- }
-
- masterPlan.addConnect(channel);
- currentBlock.setPlan(currentNode);
-
- return new ExecutionBlock[] { currentBlock, childBlock };
- }
-
- public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext, LogicalNode> {
-
- @Override
- public LogicalNode visitRoot(GlobalPlanContext context, LogicalPlan plan, LogicalRootNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitRoot(context, plan, node, stack);
-
- if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
- context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
- } else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
-
- } else {
- ExecutionBlock execBlock = context.plan.newExecutionBlock();
- execBlock.setPlan(node);
- context.topMostLeftExecBlock = execBlock;
- }
-
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitProjection(GlobalPlanContext context, LogicalPlan plan, ProjectionNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitProjection(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitLimit(GlobalPlanContext context, LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack)
- throws PlanningException {
- super.visitLimit(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitSort(GlobalPlanContext context, LogicalPlan plan, SortNode node, Stack<LogicalNode> stack)
- throws PlanningException {
-
- super.visitSort(context, plan, node, stack);
-
- if (context.lastRepartionableNode != null) {
- context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
- context.topMostLeftExecBlock);
- }
-
- context.topmost = node;
- context.lastRepartionableNode = node;
-
- return node;
- }
-
- @Override
- public LogicalNode visitGroupBy(GlobalPlanContext context, LogicalPlan plan, GroupbyNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitGroupBy(context, plan, node, stack);
-
- if (context.lastRepartionableNode != null) {
- context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
- context.topmost, context.topMostLeftExecBlock);
- }
-
- context.topmost = node;
- context.lastRepartionableNode = node;
- return node;
- }
-
- @Override
- public LogicalNode visitFilter(GlobalPlanContext context, LogicalPlan plan, SelectionNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitFilter(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitJoin(GlobalPlanContext context, LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack)
- throws PlanningException {
- super.visitJoin(context, plan, node, stack);
-
- if (context.lastRepartionableNode != null) {
- context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
- context.topMostLeftExecBlock);
- }
-
- context.topmost = node;
- context.lastRepartionableNode = node;
-
- return node;
- }
-
- @Override
- public LogicalNode visitUnion(GlobalPlanContext context, LogicalPlan plan, UnionNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitUnion(context, plan, node, stack);
-
- if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
- context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
- context.topmost, context.topMostLeftExecBlock);
- }
-
- context.topmost = node;
- context.lastRepartionableNode = node;
- return node;
- }
-
- @Override
- public LogicalNode visitExcept(GlobalPlanContext context, LogicalPlan plan, ExceptNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitExcept(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitIntersect(GlobalPlanContext context, LogicalPlan plan, IntersectNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitIntersect(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitTableSubQuery(GlobalPlanContext context, LogicalPlan plan, TableSubQueryNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitTableSubQuery(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitScan(GlobalPlanContext context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
- throws PlanningException {
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitStoreTable(GlobalPlanContext context, LogicalPlan plan, StoreTableNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- super.visitStoreTable(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
-
- @Override
- public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, InsertNode node,
- Stack<LogicalNode> stack)
- throws PlanningException {
- super.visitInsert(context, plan, node, stack);
- context.topmost = node;
- return node;
- }
- }
-
- private class UnionsFinderContext {
- List<UnionNode> unionList = new ArrayList<UnionNode>();
- }
-
- @SuppressWarnings("unused")
- private class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<UnionsFinderContext, LogicalNode> {
- @Override
- public LogicalNode visitUnion(UnionsFinderContext context, LogicalPlan plan, UnionNode node,
- Stack<LogicalNode> stack)
- throws PlanningException {
- if (node.getType() == NodeType.UNION) {
- context.unionList.add(node);
- }
-
- stack.push(node);
- TableSubQueryNode leftSubQuery = node.getLeftChild();
- TableSubQueryNode rightSubQuery = node.getRightChild();
- if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
- visitChild(context, plan, leftSubQuery, stack);
- }
- if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
- visitChild(context, plan, rightSubQuery, stack);
- }
- stack.pop();
-
- return node;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
deleted file mode 100644
index f921a15..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.NodeType;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.KeyValueSetProto;
-
-public class QueryContext extends Options {
-
- public static final String COMMAND_TYPE = "tajo.query.command";
-
- public static final String STAGING_DIR = "tajo.query.staging_dir";
-
- public static final String USER_NAME = "tajo.query.username";
-
- public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table";
- public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path";
- public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite";
- public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory";
-
- public static final String TRUE_VALUE = "1";
- public static final String FALSE_VALUE = "0";
-
- public QueryContext() {}
-
- public QueryContext(KeyValueSetProto proto) {
- super(proto);
- }
-
- public void put(TajoConf.ConfVars key, String value) {
- put(key.varname, value);
- }
-
- public String get(TajoConf.ConfVars key) {
- return get(key.varname);
- }
-
- public String get(String key) {
- return super.get(key);
- }
-
- public void setBool(String key, boolean val) {
- put(key, val ? TRUE_VALUE : FALSE_VALUE);
- }
-
- public boolean getBool(String key) {
- String strVal = get(key);
- return strVal != null ? strVal.equals(TRUE_VALUE) : false;
- }
-
- public void setUser(String username) {
- put(USER_NAME, username);
- }
-
- public String getUser() {
- return get(USER_NAME);
- }
-
- public void setStagingDir(Path path) {
- put(STAGING_DIR, path.toUri().toString());
- }
-
- public Path getStagingDir() {
- String strVal = get(STAGING_DIR);
- return strVal != null ? new Path(strVal) : null;
- }
-
- /**
- * The fact that QueryContext has an output table means this query has a target table.
- * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO <table name>' statement.
- * This config is not set if a query has INSERT (OVERWRITE) INTO LOCATION '/path/..'.
- */
- public boolean hasOutputTable() {
- return get(OUTPUT_TABLE_NAME) != null;
- }
-
- /**
- * Set a target table name
- *
- * @param tableName The target table name
- */
- public void setOutputTable(String tableName) {
- put(OUTPUT_TABLE_NAME, PlannerUtil.normalizeTableName(tableName));
- }
-
- public String getOutputTable() {
- String strVal = get(OUTPUT_TABLE_NAME);
- return strVal != null ? strVal : null;
- }
-
- /**
- * The fact that QueryContext has an output path means this query will write the output to a specific directory.
- * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO (<table name>|LOCATION)' statement.
- *
- * @return
- */
- public boolean hasOutputPath() {
- return get(OUTPUT_TABLE_PATH) != null;
- }
-
- public void setOutputPath(Path path) {
- put(OUTPUT_TABLE_PATH, path.toUri().toString());
- }
-
- public Path getOutputPath() {
- String strVal = get(OUTPUT_TABLE_PATH);
- return strVal != null ? new Path(strVal) : null;
- }
-
- public void setOutputOverwrite() {
- setBool(OUTPUT_OVERWRITE, true);
- }
-
- public boolean isOutputOverwrite() {
- return getBool(OUTPUT_OVERWRITE);
- }
-
- public void setFileOutput() {
- setBool(OUTPUT_AS_DIRECTORY, true);
- }
-
- public boolean isFileOutput() {
- return getBool(OUTPUT_AS_DIRECTORY);
- }
-
- public void setCommandType(NodeType nodeType) {
- put(COMMAND_TYPE, nodeType.name());
- }
-
- public NodeType getCommandType() {
- String strVal = get(COMMAND_TYPE);
- return strVal != null ? NodeType.valueOf(strVal) : null;
- }
-
- public void setCreateTable() {
- setCommandType(NodeType.CREATE_TABLE);
- }
-
- public boolean isCreateTable() {
- return getCommandType() == NodeType.CREATE_TABLE;
- }
-
- public void setInsert() {
- setCommandType(NodeType.INSERT);
- }
-
- public boolean isInsert() {
- return getCommandType() == NodeType.INSERT;
- }
-
- public void setHiveQueryMode() {
- setBool("hive.query.mode", true);
- }
-
- public boolean isHiveQueryMode() {
- return getBool("hive.query.mode");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index cdbd803..e09dd69 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -32,7 +32,7 @@ import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
import org.apache.tajo.master.event.TaskRequestEvent;
import org.apache.tajo.master.event.TaskScheduleEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 4618da6..2c8b822 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.QueryId;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
/**
* This event is conveyed to QueryMaster.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 38fbfe2..715f00c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.DataChannel;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoConstants;
@@ -39,9 +39,9 @@ import org.apache.tajo.catalog.TableDescImpl;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.ExecutionBlockCursor;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.AbstractStorageManager;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 6ce0fed..d603615 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -29,7 +29,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 75a58bc..faa5a5c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -28,7 +28,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index acb5dc8..94f0bc2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -18,6 +18,7 @@
package org.apache.tajo.master.querymaster;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -30,8 +31,7 @@ import org.apache.hadoop.yarn.service.Service;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.rpc.CallFuture2;
@@ -62,9 +62,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
private TajoConf systemConf;
- private Map<QueryId, QueryMasterTask> queryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
+ private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
- private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
+ private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
@@ -255,16 +255,17 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
public void stopQuery(QueryId queryId) {
+ LOG.error(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> " + queryId + "<<>>>><<<<<<>>>>");
QueryMasterTask queryMasterTask;
- synchronized(queryMasterTasks) {
- queryMasterTask = queryMasterTasks.remove(queryId);
- }
+ queryMasterTask = queryMasterTasks.remove(queryId);
+ finishedQueryMasterTasks.put(queryId, queryMasterTask);
+
if(queryMasterTask != null) {
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
- CallFuture2 futuer = new CallFuture2();
- workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, futuer);
+ CallFuture2 future = new CallFuture2();
+ workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, future);
try {
- futuer.get(3000, TimeUnit.SECONDS);
+ future.get(3, TimeUnit.SECONDS);
} catch (Throwable e) {
LOG.warn(e);
}
@@ -274,8 +275,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
-
- finishedQueryMasterTasks.put(queryId, queryMasterTask);
} else {
LOG.warn("No query info:" + queryId);
}
@@ -386,7 +385,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
LOG.info("FinishedQueryMasterTaskCleanThread started: expireIntervalTime=" + expireIntervalTime);
while(!queryMasterStop.get()) {
try {
- Thread.sleep(60 * 1000 * 60); //hourly
+ Thread.sleep(60 * 1000 * 60); // hourly
} catch (InterruptedException e) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index adc50df..c386bf2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -42,10 +42,11 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.master.GlobalEngine;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.rpc.CallFuture2;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.worker.AbstractResourceAllocator;
@@ -55,6 +56,8 @@ import org.apache.tajo.worker.YarnResourceAllocator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -158,8 +161,14 @@ public class QueryMasterTask extends CompositeService {
LOG.info("Stopping QueryMasterTask:" + queryId);
+ CallFuture2 future = new CallFuture2();
queryMasterContext.getWorkerContext().getTajoMasterRpcClient()
- .stopQueryMaster(null, queryId.getProto(), NullCallback.get());
+ .stopQueryMaster(null, queryId.getProto(), future);
+ try {
+ future.get(3, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ LOG.warn(t);
+ }
super.stop();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 194cf09..10795ea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.DataChannel;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.catalog.*;
@@ -38,7 +38,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.Fragment;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index cc38062..cb35954 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.DataChannel;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitId;
@@ -49,7 +49,7 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.master.TaskRunnerGroupEvent;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
import org.apache.tajo.master.TaskScheduler;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
deleted file mode 100644
index e1e074d..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler.event;
-
-import org.apache.tajo.QueryUnitAttemptId;
-
-public class ScheduleTaskEvent extends SchedulerEvent {
- private final QueryUnitAttemptId attemptId;
-
- public ScheduleTaskEvent(QueryUnitAttemptId id) {
- super(SchedulerEventType.SCHEDULE);
- this.attemptId = id;
- }
-
- public QueryUnitAttemptId getTaskAttemptId() {
- return attemptId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
deleted file mode 100644
index 664a2d6..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
-
- public SchedulerEvent(SchedulerEventType schedulerEventType) {
- super(schedulerEventType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
deleted file mode 100644
index 2e49c47..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler.event;
-
-/**
- * Event Type for Scheduler
- */
-public enum SchedulerEventType {
- SCHEDULE,
- RESCHEDULE
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index 5cf2e7c..1731854 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -18,9 +18,6 @@
package org.apache.tajo.worker;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index d9984b8..838811a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index a77ad2a..2e73b2b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -197,16 +197,16 @@ public class TajoWorkerClientService extends AbstractService {
TajoIdProtos.QueryIdProto request) throws ServiceException {
final QueryId queryId = new QueryId(request);
LOG.info("Stop Query:" + queryId);
- Thread t = new Thread() {
- public void run() {
+// Thread t = new Thread() {
+// public void run() {
// try {
// Thread.sleep(1000); //wait tile return to rpc response
// } catch (InterruptedException e) {
// }
workerContext.getQueryMaster().getContext().stopQuery(queryId);
- }
- };
- t.start();
+// }
+// };
+// t.start();
return BOOL_TRUE;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 29f44eb..76011dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.querymaster.QueryMaster;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 0a45cfb..d845f4f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -30,16 +30,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tajo.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.exception.UnfinishedTaskException;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.LogicalNode;
@@ -48,8 +45,8 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
-import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.StorageUtil;
@@ -300,12 +297,11 @@ public class Task {
public void cleanUp() {
// remove itself from worker
- // 끝난건지 확인
+
if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
try {
// context.getWorkDir() 지우기
localFS.delete(context.getWorkDir(), true);
- // tasks에서 자기 지우기
synchronized (taskRunnerContext.getTasks()) {
taskRunnerContext.getTasks().remove(this.getId());
}
@@ -313,8 +309,7 @@ public class Task {
e.printStackTrace();
}
} else {
- LOG.error(new UnfinishedTaskException("QueryUnitAttemptId: "
- + context.getTaskId() + " status: " + context.getState()));
+ LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
}
}