You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/23 03:49:18 UTC
tajo git commit: TAJO-1403: Improve 'Simple Query' with only
partition columns and constant values.
Repository: tajo
Updated Branches:
refs/heads/master 3aaff387c -> 8d0146b8d
TAJO-1403: Improve 'Simple Query' with only partition columns and constant values.
Closes #434
Signed-off-by: Jihoon Son <ji...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8d0146b8
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8d0146b8
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8d0146b8
Branch: refs/heads/master
Commit: 8d0146b8d8f5eeac37fe3f531ce1362af3b20c2f
Parents: 3aaff38
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Mar 23 11:48:18 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Mar 23 11:48:18 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../exec/NonForwardQueryResultFileScanner.java | 37 +++++++++++-
.../apache/tajo/master/exec/QueryExecutor.java | 5 ++
.../apache/tajo/master/TestGlobalPlanner.java | 3 +-
.../org/apache/tajo/plan/expr/EvalTreeUtil.java | 61 ++++++++++++++++++++
.../org/apache/tajo/plan/util/PlannerUtil.java | 26 ++++++++-
.../apache/tajo/storage/FileStorageManager.java | 27 ++++++++-
7 files changed, 155 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e3636e6..ad3a6bd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1403: Improve 'Simple Query' with only partition columns and constant
+ values. (Contributed by Dongjoon Hyun, Committed by jihoon)
+
TAJO-1418: Comment on TAJO_PULLSERVER_STANDALONE in tajo-env.sh
is not consistent. (Contributed by navis, Committed by hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index dc0c44a..6c02aa9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -24,19 +24,23 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.engine.planner.physical.SeqScanExec;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.FileStorageManager;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.StringUtils;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -74,10 +78,37 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
initSeqScanExec();
}
+ /**
+ * Set partition path and depth if ScanNode's qualification exists
+ *
+ * @param storageManager target storage manager to be set with partition info
+ */
+ private void setPartition(StorageManager storageManager) {
+ if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null &&
+ storageManager instanceof FileStorageManager) {
+ StringBuffer path = new StringBuffer();
+ int depth = 0;
+ if (tableDesc.hasPartition()) {
+ for (Column c : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) {
+ String partitionValue = EvalTreeUtil.getPartitionValue(scanNode.getQual(), c.getSimpleName());
+ if (partitionValue == null)
+ break;
+ path.append(String.format("/%s=%s", c.getSimpleName(), StringUtils.escapePathName(partitionValue)));
+ depth++;
+ }
+ }
+ ((FileStorageManager)storageManager).setPartitionPath(path.toString());
+ ((FileStorageManager)storageManager).setCurrentDepth(depth);
+ scanNode.setQual(null);
+ }
+ }
+
private void initSeqScanExec() throws IOException {
- List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
- .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
-
+ StorageManager storageManager = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType());
+ List<Fragment> fragments = null;
+ setPartition(storageManager);
+ fragments = storageManager.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+
if (fragments != null && !fragments.isEmpty()) {
FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {}));
this.taskContext = new TaskAttemptContext(
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index db82fca..aa8b228 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -212,6 +212,11 @@ public class QueryExecutor {
scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
}
TableDesc desc = scanNode.getTableDesc();
+ // Keep info for partition-column-only queries
+ SelectionNode selectionNode = plan.getRootBlock().getNode(NodeType.SELECTION);
+ if (desc.isExternal() && desc.hasPartition() && selectionNode != null) {
+ scanNode.setQual(selectionNode.getQual());
+ }
int maxRow = Integer.MAX_VALUE;
if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index d0f7cf4..45c94a3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -317,8 +317,9 @@ public class TestGlobalPlanner {
plan = buildPlan("select * from customer where c_nationkey = 1");
assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+ // c_nationkey is partition column
plan = buildPlan("select * from customer_parts where c_nationkey = 1");
- assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+ assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
// same column order
plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" +
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
index 23b4659..0d0ea88 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java
@@ -573,4 +573,65 @@ public class EvalTreeUtil {
public static Datum evaluateImmediately(EvalNode evalNode) {
return evalNode.eval(null, null);
}
+
+ /**
+ * Checks whether EvalNode consists of only partition columns and const values.
+ * The partition based simple query can be defined as 'select * from tb_name where col_name1="X" and col_name2="Y" [LIMIT Z]',
+ * whose WHERE clause consists of only partition-columns with constant values.
+ * Partition columns must be able to form a prefix of HDFS path like '/tb_name1/col_name1=X/col_name2=Y'.
+ *
+ * @param node The qualification node of a SELECTION node
+ * @param partSchema Partition expression schema
+ * @return True if the query is partition-column based simple query.
+ */
+ public static boolean checkIfPartitionSelection(EvalNode node, Schema partSchema) {
+ if (node != null && node instanceof BinaryEval) {
+ BinaryEval eval = (BinaryEval)node;
+ EvalNode left = eval.getLeftExpr();
+ EvalNode right = eval.getRightExpr();
+ EvalType type = eval.getType();
+
+ if (type == EvalType.EQUAL) {
+ if (left instanceof FieldEval && right instanceof ConstEval && partSchema.contains(((FieldEval) left).getColumnName())) {
+ return true;
+ } else if (left instanceof ConstEval && right instanceof FieldEval && partSchema.contains(((FieldEval) right).getColumnName())) {
+ return true;
+ }
+ } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) {
+ return checkIfPartitionSelection(left, partSchema) && checkIfPartitionSelection(right, partSchema);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get partition constant value associated with `columnName`.
+ *
+ * @param node EvalNode having query predicates
+ * @param columnName Column name to be looked up
+ * @return String The value associated with `columnName` in the predicates
+ */
+ public static String getPartitionValue(EvalNode node, String columnName) {
+ if (node != null && node instanceof BinaryEval) {
+ BinaryEval eval = (BinaryEval)node;
+ EvalNode left = eval.getLeftExpr();
+ EvalNode right = eval.getRightExpr();
+ EvalType type = eval.getType();
+
+ if (type == EvalType.EQUAL) {
+ if (left instanceof FieldEval && right instanceof ConstEval && columnName.equals(((FieldEval) left).getColumnName())) {
+ return ((ConstEval)right).getValue().toString();
+ } else if (left instanceof ConstEval && right instanceof FieldEval && columnName.equals(((FieldEval) right).getColumnName())) {
+ return ((ConstEval)left).getValue().toString();
+ }
+ } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) {
+ String value = getPartitionValue(left, columnName);
+ if (value == null) {
+ value = getPartitionValue(right, columnName);
+ }
+ return value;
+ }
+ }
+ return null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 0fbd359..b09fc9e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -100,6 +100,7 @@ public class PlannerUtil {
PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
boolean noComplexComputation = false;
+ boolean prefixPartitionWhere = false;
if (singleRelation) {
ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
if (scanNode == null) {
@@ -133,11 +134,34 @@ public class PlannerUtil {
}
}
}
+
+ /**
+ * TODO: Remove isExternal check after resolving the following issues
+ * - TAJO-1416: INSERT INTO EXTERNAL PARTITIONED TABLE
+ * - TAJO-1441: INSERT INTO MANAGED PARTITIONED TABLE
+ */
+ if (!noWhere && scanNode.getTableDesc().isExternal() && scanNode.getTableDesc().getPartitionMethod() != null) {
+ EvalNode node = ((SelectionNode) plan.getRootBlock().getNode(NodeType.SELECTION)).getQual();
+ Schema partSchema = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema();
+ if (EvalTreeUtil.checkIfPartitionSelection(node, partSchema)) {
+ prefixPartitionWhere = true;
+ boolean isPrefix = true;
+ for (Column c : partSchema.getColumns()) {
+ String value = EvalTreeUtil.getPartitionValue(node, c.getSimpleName());
+ if (isPrefix && value == null)
+ isPrefix = false;
+ else if (!isPrefix && value != null) {
+ prefixPartitionWhere = false;
+ break;
+ }
+ }
+ }
+ }
}
return !checkIfDDLPlan(rootNode) &&
(simpleOperator && noComplexComputation && isOneQueryBlock &&
- noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
+ noOrderBy && noGroupBy && (noWhere || prefixPartitionWhere) && noJoin && singleRelation);
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d0146b8/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index c427940..8d425b4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -149,6 +149,21 @@ public class FileStorageManager extends StorageManager {
return new Path(tableBaseDir, tableName);
}
+ private String partitionPath = "";
+ private int currentDepth = 0;
+
+ /**
+ * Set a specific partition path for partition-column only queries
+ * @param path The partition prefix path
+ */
+ public void setPartitionPath(String path) { partitionPath = path; }
+
+ /**
+ * Set a depth of partition path for partition-column only queries
+ * @param depth Depth of partitions
+ */
+ public void setCurrentDepth(int depth) { currentDepth = depth; }
+
@VisibleForTesting
public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
throws IOException {
@@ -722,8 +737,16 @@ public class FileStorageManager extends StorageManager {
List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
if (fs.exists(tablePath)) {
- getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments,
- new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
+ if (!partitionPath.isEmpty()) {
+ Path partPath = new Path(tableDesc.getPath() + partitionPath);
+ if (fs.exists(partPath)) {
+ getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments,
+ new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth);
+ }
+ } else {
+ getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments,
+ new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
+ }
}
List<Fragment> fragments = new ArrayList<Fragment>();