You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/16 08:57:36 UTC
tajo git commit: TAJO-2151: Fix broken CI.
Repository: tajo
Updated Branches:
refs/heads/master 4594b6116 -> 30a46592c
TAJO-2151: Fix broken CI.
Closes #1022
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/30a46592
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/30a46592
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/30a46592
Branch: refs/heads/master
Commit: 30a46592c46a87cbbfba0486c72cc02f4a398369
Parents: 4594b61
Author: Jihoon Son <ji...@apache.org>
Authored: Mon May 16 17:56:54 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon May 16 17:56:54 2016 +0900
----------------------------------------------------------------------
.gitignore | 3 +
CHANGES | 2 +
.../tajo/engine/query/TestHBaseTable.java | 10 +-
.../exec/NonForwardQueryResultFileScanner.java | 21 ++--
.../apache/tajo/querymaster/Repartitioner.java | 76 ++++---------
.../java/org/apache/tajo/querymaster/Stage.java | 37 ++-----
.../java/org/apache/tajo/util/SplitUtil.java | 106 +++++++++++++++++++
.../plan/logical/PartitionedTableScanNode.java | 12 ++-
.../org/apache/tajo/storage/Tablespace.java | 3 +
.../tajo/storage/hbase/HBaseTablespace.java | 1 +
.../org/apache/tajo/storage/FileTablespace.java | 15 ++-
.../apache/tajo/storage/TestFileSystems.java | 2 +-
.../apache/tajo/storage/TestFileTablespace.java | 8 +-
.../org/apache/tajo/storage/TestStorages.java | 2 +-
.../tajo/storage/jdbc/JdbcTablespace.java | 1 +
.../storage/pgsql/TestPgSQLJdbcTableSpace.java | 2 +-
16 files changed, 183 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5bca77c..276de0d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,6 @@ atlassian-ide-plugin.xml
# Patch files
*.patch
+
+# Antlr files
+*.tokens
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5e409c2..671ac4b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -149,6 +149,8 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2151: Fix broken CI. (jihoon)
+
TAJO-2158: The concat_ws function can't support a tab separator.
(Byunghwa Yun via jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 69a0230..8914e3b 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -553,7 +553,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
new ConstEval(new TextDatum("021")));
scanNode.setQual(evalNodeEq);
Tablespace tablespace = TablespaceManager.getByName("cluster1");
- List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+ List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(1, fragments.size());
assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
@@ -566,7 +566,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
scanNode.setQual(evalNodeA);
- fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(2, fragments.size());
HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
@@ -581,7 +581,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
new ConstEval(new TextDatum("075")));
EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
scanNode.setQual(evalNodeB);
- fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
@@ -604,7 +604,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
scanNode.setQual(evalNodeD);
- fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
@@ -627,7 +627,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
scanNode.setQual(evalNodeD);
- fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+ fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(2, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index a1728ec..871db89 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -26,31 +26,33 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos.CodecType;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.TajoProtos.CodecType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec;
import org.apache.tajo.engine.planner.physical.ScanExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.io.AsyncTaskService;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.querymaster.Repartitioner;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.tuple.memory.MemoryBlock;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.util.CompressionUtil;
-import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -102,13 +104,8 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
private void initSeqScanExec() throws IOException, TajoException {
Tablespace tablespace = TablespaceManager.get(tableDesc.getUri());
- List<Fragment> fragments = Lists.newArrayList();
- if (tableDesc.hasPartition()) {
- FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class);
- fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc));
- } else {
- fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode.getQual()));
- }
+ List<Fragment> fragments = Lists.newArrayList(
+ SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));
if (!fragments.isEmpty()) {
FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()]));
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index ba051a3..c264e3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -19,7 +19,6 @@
package org.apache.tajo.querymaster;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,11 +53,15 @@ import org.apache.tajo.pullserver.PullServerConstants;
import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Task.PullHost;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;
@@ -120,8 +123,9 @@ public class Repartitioner {
// if table has no data, tablespace will return empty FileFragment.
// So, we need to handle FileFragment by its size.
// If we don't check its size, it can cause IndexOutOfBoundsException.
- Tablespace space = TablespaceManager.get(tableDesc.getUri());
- List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc, null);
+ List<Fragment> fileFragments = SplitUtil.getSplits(
+ TablespaceManager.get(tableDesc.getUri()), scans[i], tableDesc, false);
+
if (fileFragments.size() > 0) {
fragments[i] = fileFragments.get(0);
} else {
@@ -385,27 +389,16 @@ public class Repartitioner {
//If there are more than one data files, that files should be added to fragments or partition path
for (ScanNode eachScan: broadcastScans) {
+ // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with
+ // plain tables.
+ if (eachScan.getType() != NodeType.PARTITIONS_SCAN) {
+ TableDesc tableDesc = masterContext.getTableDesc(eachScan);
- Path[] partitionScanPaths = null;
- TableDesc tableDesc = masterContext.getTableDesc(eachScan);
- Tablespace space = TablespaceManager.get(tableDesc.getUri());
-
- if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
-
- PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
- partitionScanPaths = partitionScan.getInputPaths();
- // set null to inputPaths in getFragmentsFromPartitionedTable()
- getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc);
- partitionScan.setInputPaths(partitionScanPaths);
-
- } else {
-
- Collection<Fragment> scanFragments =
- space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual());
+ Collection<Fragment> scanFragments = SplitUtil.getSplits(
+ TablespaceManager.get(tableDesc.getUri()), eachScan, tableDesc, false);
if (scanFragments != null) {
rightFragments.addAll(scanFragments);
}
-
}
}
}
@@ -468,24 +461,6 @@ public class Repartitioner {
return mergedHashEntries;
}
- /**
- * It creates a number of fragments for all partitions.
- */
- public static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tsHandler,
- ScanNode scan,
- TableDesc table) throws IOException {
- Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace");
- if (!(scan instanceof PartitionedTableScanNode)) {
- throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
- }
- List<Fragment> fragments = Lists.newArrayList();
- PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
- fragments.addAll(((FileTablespace) tsHandler).getSplits(
- scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
- partitionsScan.setInputPaths(null);
- return fragments;
- }
-
private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage,
int baseScanId, Fragment[] fragments)
throws IOException, TajoException {
@@ -513,30 +488,15 @@ public class Repartitioner {
ScanNode scan = scans[i];
TableDesc desc = stage.getContext().getTableDesc(scan);
- Collection<Fragment> scanFragments;
- Path[] partitionScanPaths = null;
-
-
- Tablespace space = TablespaceManager.get(desc.getUri());
-
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan;
- partitionScanPaths = partitionScan.getInputPaths();
- // set null to inputPaths in getFragmentsFromPartitionedTable()
- scanFragments = getFragmentsFromPartitionedTable(space, scan, desc);
- } else {
- scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual());
- }
+ Collection<Fragment> scanFragments = SplitUtil.getSplits(TablespaceManager.get(desc.getUri()), scan, desc, false);
if (scanFragments != null) {
if (i == baseScanId) {
baseFragments = scanFragments;
} else {
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
- // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty
- partitionScan.setInputPaths(partitionScanPaths);
- } else {
+ // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with
+ // plain tables.
+ if (scan.getType() != NodeType.PARTITIONS_SCAN) {
broadcastFragments.addAll(scanFragments);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index fed75cd..c055d11 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -52,7 +52,6 @@ import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
@@ -61,13 +60,12 @@ import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.RpcParameterFactory;
+import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.util.history.TaskHistory;
@@ -1185,34 +1183,13 @@ public class Stage implements EventHandler<StageEvent> {
ScanNode scan = scans[0];
TableDesc table = stage.context.getTableDesc(scan);
- Collection<Fragment> fragments;
- Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri());
-
- // Depending on scanner node's type, it creates fragments. If scan is for
- // a partitioned table, It will creates lots fragments for all partitions.
- // Otherwise, it creates at least one fragments for a table, which may
- // span a number of blocks or possibly consists of a number of files.
- //
- // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN.
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- // After calling this method, partition paths are removed from the physical plan.
- fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table);
- } else {
- fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan.getQual());
- }
-
+ Collection<Fragment> fragments = SplitUtil.getSplits(
+ TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false);
+ SplitUtil.preparePartitionScanPlanForSchedule(scan);
Stage.scheduleFragments(stage, fragments);
- if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) {
- //Leaf task of DefaultTaskScheduler should be fragment size
- // EstimatedTaskNum determined number of initial container
- stage.schedulerContext.setEstimatedTaskNum(fragments.size());
- } else {
- TajoConf conf = stage.context.getConf();
- stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
- int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
- (double) stage.schedulerContext.getTaskSize());
- stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
- }
+
+ // The number of leaf tasks should be the number of fragments.
+ stage.schedulerContext.setEstimatedTaskNum(fragments.size());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java
new file mode 100644
index 0000000..5f66b07
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.PartitionedTableScanNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class SplitUtil {
+
+ /**
+ * This method creates fragments depending on the table type. If the table is
+ * a partitioned table, it will creates multiple fragments for all partitions.
+ * Otherwise, it creates at least one fragments for a table, which may
+ * span a number of blocks or possibly consists of a number of files.
+ *
+ * Also, we can ensure FileTableSpace if the type of table is a partitioned table.
+ *
+ * @param tablespace tablespace handler
+ * @param scan scan node
+ * @param tableDesc table desc of scan node
+ * @param requireSort if set, the result fragments will be sorted with their paths.
+ * Only set when a query type is the simple query.
+ * @return a list of fragments for input table
+ * @throws IOException
+ * @throws TajoException
+ */
+ public static List<Fragment> getSplits(Tablespace tablespace,
+ ScanNode scan,
+ TableDesc tableDesc,
+ boolean requireSort)
+ throws IOException, TajoException {
+ List<Fragment> fragments;
+ if (tableDesc.hasPartition()) {
+ // TODO: Partition tables should also be handled by tablespace.
+ fragments = SplitUtil.getFragmentsFromPartitionedTable(tablespace, scan, tableDesc, requireSort);
+ } else {
+ fragments = tablespace.getSplits(scan.getCanonicalName(), tableDesc, requireSort, scan.getQual());
+ }
+
+ return fragments;
+ }
+
+ /**
+ * It creates a number of fragments for all partitions.
+ */
+ private static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tsHandler,
+ ScanNode scan,
+ TableDesc table,
+ boolean requireSort) throws IOException {
+ Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace");
+ if (!(scan instanceof PartitionedTableScanNode)) {
+ throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
+ }
+ List<Fragment> fragments = Lists.newArrayList();
+ PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+ fragments.addAll(((FileTablespace) tsHandler).getSplits(
+ scan.getCanonicalName(), table.getMeta(), table.getSchema(), requireSort, partitionsScan.getInputPaths()));
+ return fragments;
+ }
+
+ /**
+ * Clear input paths of {@link PartitionedTableScanNode}.
+ * This is to avoid unnecessary transmission of a lot of partition table paths to workers.
+ * So, this method should be invoked before {@link org.apache.tajo.querymaster.Stage#scheduleFragment(Stage, Fragment)}
+ * unless the scan is broadcasted.
+ *
+ * @param scanNode scan node
+ */
+ public static void preparePartitionScanPlanForSchedule(ScanNode scanNode) {
+ if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+ // TODO: The partition input paths don't have to be kept in a logical node at all.
+ // This should be improved by implementing a specialized fragment for partition tables.
+ PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scanNode;
+ partitionScan.clearInputPaths();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
index 6176882..d3f7148 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
@@ -29,9 +29,10 @@ import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.util.TUtil;
import java.util.ArrayList;
+import java.util.Arrays;
public class PartitionedTableScanNode extends ScanNode {
- @Expose Path [] inputPaths;
+ @Expose private Path [] inputPaths;
public PartitionedTableScanNode(int pid) {
super(pid, NodeType.PARTITIONS_SCAN);
@@ -43,15 +44,22 @@ public class PartitionedTableScanNode extends ScanNode {
setOutSchema(scanNode.getOutSchema());
this.qual = scanNode.qual;
this.targets = scanNode.targets;
- this.inputPaths = inputPaths;
+ setInputPaths(inputPaths);
if (scanNode.hasAlias()) {
alias = scanNode.alias;
}
}
+ public void clearInputPaths() {
+ this.inputPaths = null;
+ }
+
public void setInputPaths(Path [] paths) {
this.inputPaths = paths;
+ if (this.inputPaths != null) {
+ Arrays.sort(inputPaths);
+ }
}
public Path [] getInputPaths() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 00e6d75..4afb383 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -124,14 +124,17 @@ public abstract class Tablespace {
/**
* Returns the splits that will serve as input for the scan tasks. The
* number of splits matches the number of regions in a table.
+ *
* @param inputSourceId Input source identifier, which can be either relation name or execution block id
* @param tableDesc The table description for the target data.
+ * @param requireSort The result fragments will be sorted with their paths.
* @param filterCondition filter condition which can prune splits if possible
* @return The list of input fragments.
* @throws java.io.IOException
*/
public abstract List<Fragment> getSplits(String inputSourceId,
TableDesc tableDesc,
+ boolean requireSort,
@Nullable EvalNode filterCondition) throws IOException, TajoException;
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 132ceff..0cf883e 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -557,6 +557,7 @@ public class HBaseTablespace extends Tablespace {
@Override
public List<Fragment> getSplits(String inputSourceId,
TableDesc table,
+ boolean requireSorted,
@Nullable EvalNode filterCondition) throws IOException, TajoException {
return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2aa2b91..f3cb9a5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -304,10 +304,12 @@ public class FileTablespace extends Tablespace {
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
+ * @param requireSort if set, result will be sorted by their paths.
+ * @param dirs input dirs
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
- protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+ protected List<FileStatus> listStatus(boolean requireSort, Path... dirs) throws IOException {
List<FileStatus> result = new ArrayList<>();
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
@@ -342,6 +344,10 @@ public class FileTablespace extends Tablespace {
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
+
+ if (requireSort) {
+ Collections.sort(result);
+ }
LOG.info("Total input paths to process : " + result.size());
return result;
}
@@ -464,7 +470,7 @@ public class FileTablespace extends Tablespace {
*
* @throws IOException
*/
- public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
+ public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, boolean requireSort, Path... inputs)
throws IOException {
// generate splits'
@@ -477,7 +483,7 @@ public class FileTablespace extends Tablespace {
if (fs.isFile(p)) {
files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
} else {
- files.addAll(listStatus(p));
+ files.addAll(listStatus(requireSort, p));
}
int previousSplitSize = splits.size();
@@ -606,8 +612,9 @@ public class FileTablespace extends Tablespace {
@Override
public List<Fragment> getSplits(String inputSourceId,
TableDesc table,
+ boolean requireSort,
@Nullable EvalNode filterCondition) throws IOException {
- return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri()));
+ return getSplits(inputSourceId, table.getMeta(), table.getSchema(), requireSort, new Path(table.getUri()));
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index c62a01c..1818330 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -127,7 +127,7 @@ public class TestFileSystems {
appender.close();
FileStatus fileStatus = fs.getFileStatus(path);
- List<Fragment> splits = sm.getSplits("table", meta, schema, path);
+ List<Fragment> splits = sm.getSplits("table", meta, schema, false, path);
int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
assertEquals(splitSize, splits.size());
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index dc8781e..ad45d18 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -156,13 +156,13 @@ public class TestFileTablespace {
List<Fragment> splits = Lists.newArrayList();
// Get FileFragments in partition batch
- splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+ splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()])));
assertEquals(testCount, splits.size());
// -1 is unknown volumeId
assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
splits.clear();
- splits.addAll(space.getSplits("data", meta, schema,
+ splits.addAll(space.getSplits("data", meta, schema, false,
partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
assertEquals(testCount / 2, splits.size());
assertEquals(1, splits.get(0).getHosts().length);
@@ -212,7 +212,7 @@ public class TestFileTablespace {
List<Fragment> splits = Lists.newArrayList();
// Get FileFragments in partition batch
- splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+ splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()])));
assertEquals(0, splits.size());
fs.close();
@@ -256,7 +256,7 @@ public class TestFileTablespace {
TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, conf);
List<Fragment> splits = Lists.newArrayList();
- splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+ splits.addAll(sm.getSplits("data", meta, schema, false, tablePath));
assertEquals(testCount, splits.size());
assertEquals(2, splits.get(0).getHosts().length);
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 3e36340..c1423d7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -269,7 +269,7 @@ public class TestStorages {
assertEquals(0, fileStatus.getLen());
}
- List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, testDir);
+ List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, false, testDir);
int tupleCnt = 0;
for (Fragment fragment : splits) {
Scanner scanner = sm.getScanner(meta, schema, fragment, schema);
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index fa6cf48..7a630b2 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -122,6 +122,7 @@ public abstract class JdbcTablespace extends Tablespace {
@Override
public List<Fragment> getSplits(String inputSourceId,
TableDesc tableDesc,
+ boolean requireSorted,
@Nullable EvalNode filterCondition) throws IOException {
return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString()));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
index 0d0e15e..875d70e 100644
--- a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
+++ b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
@@ -74,7 +74,7 @@ public class TestPgSQLJdbcTableSpace {
Tablespace space = TablespaceManager.getByName("pgsql_cluster");
MetadataProvider provider = space.getMetadataProvider();
TableDesc table = provider.getTableDesc(null, "lineitem");
- List<Fragment> fragments = space.getSplits("lineitem", table, null);
+ List<Fragment> fragments = space.getSplits("lineitem", table, false, null);
assertNotNull(fragments);
assertEquals(1, fragments.size());
}