You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/16 08:57:36 UTC

tajo git commit: TAJO-2151: Fix broken CI.

Repository: tajo
Updated Branches:
  refs/heads/master 4594b6116 -> 30a46592c


TAJO-2151: Fix broken CI.

Closes #1022


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/30a46592
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/30a46592
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/30a46592

Branch: refs/heads/master
Commit: 30a46592c46a87cbbfba0486c72cc02f4a398369
Parents: 4594b61
Author: Jihoon Son <ji...@apache.org>
Authored: Mon May 16 17:56:54 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon May 16 17:56:54 2016 +0900

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 CHANGES                                         |   2 +
 .../tajo/engine/query/TestHBaseTable.java       |  10 +-
 .../exec/NonForwardQueryResultFileScanner.java  |  21 ++--
 .../apache/tajo/querymaster/Repartitioner.java  |  76 ++++---------
 .../java/org/apache/tajo/querymaster/Stage.java |  37 ++-----
 .../java/org/apache/tajo/util/SplitUtil.java    | 106 +++++++++++++++++++
 .../plan/logical/PartitionedTableScanNode.java  |  12 ++-
 .../org/apache/tajo/storage/Tablespace.java     |   3 +
 .../tajo/storage/hbase/HBaseTablespace.java     |   1 +
 .../org/apache/tajo/storage/FileTablespace.java |  15 ++-
 .../apache/tajo/storage/TestFileSystems.java    |   2 +-
 .../apache/tajo/storage/TestFileTablespace.java |   8 +-
 .../org/apache/tajo/storage/TestStorages.java   |   2 +-
 .../tajo/storage/jdbc/JdbcTablespace.java       |   1 +
 .../storage/pgsql/TestPgSQLJdbcTableSpace.java  |   2 +-
 16 files changed, 183 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5bca77c..276de0d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,6 @@ atlassian-ide-plugin.xml
 
 # Patch files
 *.patch
+
+# Antlr files
+*.tokens

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5e409c2..671ac4b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -149,6 +149,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2151: Fix broken CI. (jihoon)
+
     TAJO-2158: The concat_ws function can't support a tab separator.
     (Byunghwa Yun via jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 69a0230..8914e3b 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -553,7 +553,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
         new ConstEval(new TextDatum("021")));
     scanNode.setQual(evalNodeEq);
     Tablespace tablespace = TablespaceManager.getByName("cluster1");
-    List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+    List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(1, fragments.size());
     assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
     assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
@@ -566,7 +566,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
     scanNode.setQual(evalNodeA);
 
-    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(2, fragments.size());
     HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
     assertEquals("020", new String(fragment1.getStartRow()));
@@ -581,7 +581,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
         new ConstEval(new TextDatum("075")));
     EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
     scanNode.setQual(evalNodeB);
-    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(3, fragments.size());
     fragment1 = (HBaseFragment) fragments.get(0);
     assertEquals("020", new String(fragment1.getStartRow()));
@@ -604,7 +604,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
     EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
     scanNode.setQual(evalNodeD);
-    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(3, fragments.size());
 
     fragment1 = (HBaseFragment) fragments.get(0);
@@ -627,7 +627,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
     evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
     scanNode.setQual(evalNodeD);
-    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(2, fragments.size());
 
     fragment1 = (HBaseFragment) fragments.get(0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index a1728ec..871db89 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -26,31 +26,33 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec;
 import org.apache.tajo.engine.planner.physical.ScanExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.io.AsyncTaskService;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.querymaster.Repartitioner;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.tuple.memory.MemoryBlock;
 import org.apache.tajo.tuple.memory.MemoryRowBlock;
 import org.apache.tajo.util.CompressionUtil;
-import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.SplitUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -102,13 +104,8 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
   private void initSeqScanExec() throws IOException, TajoException {
     Tablespace tablespace = TablespaceManager.get(tableDesc.getUri());
 
-    List<Fragment> fragments = Lists.newArrayList();
-    if (tableDesc.hasPartition()) {
-      FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class);
-      fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc));
-    } else {
-      fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode.getQual()));
-    }
+    List<Fragment> fragments = Lists.newArrayList(
+        SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));
 
     if (!fragments.isEmpty()) {
       FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index ba051a3..c264e3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.querymaster;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,11 +53,15 @@ import org.apache.tajo.pullserver.PullServerConstants;
 import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.SplitUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
@@ -120,8 +123,9 @@ public class Repartitioner {
         // if table has no data, tablespace will return empty FileFragment.
         // So, we need to handle FileFragment by its size.
         // If we don't check its size, it can cause IndexOutOfBoundsException.
-        Tablespace space = TablespaceManager.get(tableDesc.getUri());
-        List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc, null);
+        List<Fragment> fileFragments = SplitUtil.getSplits(
+            TablespaceManager.get(tableDesc.getUri()), scans[i], tableDesc, false);
+
         if (fileFragments.size() > 0) {
           fragments[i] = fileFragments.get(0);
         } else {
@@ -385,27 +389,16 @@ public class Repartitioner {
       //If there are more than one data files, that files should be added to fragments or partition path
 
       for (ScanNode eachScan: broadcastScans) {
+        // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with
+        // plain tables.
+        if (eachScan.getType() != NodeType.PARTITIONS_SCAN) {
+          TableDesc tableDesc = masterContext.getTableDesc(eachScan);
 
-        Path[] partitionScanPaths = null;
-        TableDesc tableDesc = masterContext.getTableDesc(eachScan);
-        Tablespace space = TablespaceManager.get(tableDesc.getUri());
-
-        if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
-
-          PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
-          partitionScanPaths = partitionScan.getInputPaths();
-          // set null to inputPaths in getFragmentsFromPartitionedTable()
-          getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc);
-          partitionScan.setInputPaths(partitionScanPaths);
-
-        } else {
-
-          Collection<Fragment> scanFragments =
-              space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual());
+          Collection<Fragment> scanFragments = SplitUtil.getSplits(
+              TablespaceManager.get(tableDesc.getUri()), eachScan, tableDesc, false);
           if (scanFragments != null) {
             rightFragments.addAll(scanFragments);
           }
-
         }
       }
     }
@@ -468,24 +461,6 @@ public class Repartitioner {
     return mergedHashEntries;
   }
 
-  /**
-   * It creates a number of fragments for all partitions.
-   */
-  public static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tsHandler,
-                                                                          ScanNode scan,
-                                                                          TableDesc table) throws IOException {
-    Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace");
-    if (!(scan instanceof PartitionedTableScanNode)) {
-      throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
-    }
-    List<Fragment> fragments = Lists.newArrayList();
-    PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
-    fragments.addAll(((FileTablespace) tsHandler).getSplits(
-        scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
-    partitionsScan.setInputPaths(null);
-    return fragments;
-  }
-
   private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage,
                                                           int baseScanId, Fragment[] fragments)
       throws IOException, TajoException {
@@ -513,30 +488,15 @@ public class Repartitioner {
       ScanNode scan = scans[i];
       TableDesc desc = stage.getContext().getTableDesc(scan);
 
-      Collection<Fragment> scanFragments;
-      Path[] partitionScanPaths = null;
-
-
-      Tablespace space = TablespaceManager.get(desc.getUri());
-
-      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
-        PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan;
-        partitionScanPaths = partitionScan.getInputPaths();
-        // set null to inputPaths in getFragmentsFromPartitionedTable()
-        scanFragments = getFragmentsFromPartitionedTable(space, scan, desc);
-      } else {
-        scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual());
-      }
+      Collection<Fragment> scanFragments = SplitUtil.getSplits(TablespaceManager.get(desc.getUri()), scan, desc, false);
 
       if (scanFragments != null) {
         if (i == baseScanId) {
           baseFragments = scanFragments;
         } else {
-          if (scan.getType() == NodeType.PARTITIONS_SCAN) {
-            PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
-            // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty
-            partitionScan.setInputPaths(partitionScanPaths);
-          } else {
+          // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with
+          // plain tables.
+          if (scan.getType() != NodeType.PARTITIONS_SCAN) {
             broadcastFragments.addAll(scanFragments);
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index fed75cd..c055d11 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -52,7 +52,6 @@ import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
@@ -61,13 +60,12 @@ import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.RpcParameterFactory;
+import org.apache.tajo.util.SplitUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.util.history.TaskHistory;
@@ -1185,34 +1183,13 @@ public class Stage implements EventHandler<StageEvent> {
       ScanNode scan = scans[0];
       TableDesc table = stage.context.getTableDesc(scan);
 
-      Collection<Fragment> fragments;
-      Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri());
-
-      // Depending on scanner node's type, it creates fragments. If scan is for
-      // a partitioned table, It will creates lots fragments for all partitions.
-      // Otherwise, it creates at least one fragments for a table, which may
-      // span a number of blocks or possibly consists of a number of files.
-      //
-      // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN.
-      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
-        // After calling this method, partition paths are removed from the physical plan.
-        fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table);
-      } else {
-        fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan.getQual());
-      }
-
+      Collection<Fragment> fragments = SplitUtil.getSplits(
+          TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false);
+      SplitUtil.preparePartitionScanPlanForSchedule(scan);
       Stage.scheduleFragments(stage, fragments);
-      if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) {
-        //Leaf task of DefaultTaskScheduler should be fragment size
-        // EstimatedTaskNum determined number of initial container
-        stage.schedulerContext.setEstimatedTaskNum(fragments.size());
-      } else {
-        TajoConf conf = stage.context.getConf();
-        stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
-        int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
-            (double) stage.schedulerContext.getTaskSize());
-        stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
-      }
+
+      // The number of leaf tasks should be the number of fragments.
+      stage.schedulerContext.setEstimatedTaskNum(fragments.size());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java
new file mode 100644
index 0000000..5f66b07
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.PartitionedTableScanNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class SplitUtil {
+
+  /**
+   * This method creates fragments depending on the table type. If the table is
+   * a partitioned table, it will creates multiple fragments for all partitions.
+   * Otherwise, it creates at least one fragments for a table, which may
+   * span a number of blocks or possibly consists of a number of files.
+   *
+   * Also, we can ensure FileTableSpace if the type of table is a partitioned table.
+   *
+   * @param tablespace tablespace handler
+   * @param scan scan node
+   * @param tableDesc table desc of scan node
+   * @param requireSort if set, the result fragments will be sorted with their paths.
+   *                    Only set when a query type is the simple query.
+   * @return a list of fragments for input table
+   * @throws IOException
+   * @throws TajoException
+   */
+  public static List<Fragment> getSplits(Tablespace tablespace,
+                                         ScanNode scan,
+                                         TableDesc tableDesc,
+                                         boolean requireSort)
+      throws IOException, TajoException {
+    List<Fragment> fragments;
+    if (tableDesc.hasPartition()) {
+      // TODO: Partition tables should also be handled by tablespace.
+      fragments = SplitUtil.getFragmentsFromPartitionedTable(tablespace, scan, tableDesc, requireSort);
+    } else {
+      fragments = tablespace.getSplits(scan.getCanonicalName(), tableDesc, requireSort, scan.getQual());
+    }
+
+    return fragments;
+  }
+
+  /**
+   * It creates a number of fragments for all partitions.
+   */
+  private static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tsHandler,
+                                                                 ScanNode scan,
+                                                                 TableDesc table,
+                                                                 boolean requireSort) throws IOException {
+    Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace");
+    if (!(scan instanceof PartitionedTableScanNode)) {
+      throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
+    }
+    List<Fragment> fragments = Lists.newArrayList();
+    PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+    fragments.addAll(((FileTablespace) tsHandler).getSplits(
+        scan.getCanonicalName(), table.getMeta(), table.getSchema(), requireSort, partitionsScan.getInputPaths()));
+    return fragments;
+  }
+
+  /**
+   * Clear input paths of {@link PartitionedTableScanNode}.
+   * This is to avoid unnecessary transmission of a lot of partition table paths to workers.
+   * So, this method should be invoked before {@link org.apache.tajo.querymaster.Stage#scheduleFragment(Stage, Fragment)}
+   * unless the scan is broadcasted.
+   *
+   * @param scanNode scan node
+   */
+  public static void preparePartitionScanPlanForSchedule(ScanNode scanNode) {
+    if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+      // TODO: The partition input paths don't have to be kept in a logical node at all.
+      //       This should be improved by implementing a specialized fragment for partition tables.
+      PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scanNode;
+      partitionScan.clearInputPaths();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
index 6176882..d3f7148 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
@@ -29,9 +29,10 @@ import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.util.TUtil;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 
 public class PartitionedTableScanNode extends ScanNode {
-  @Expose Path [] inputPaths;
+  @Expose private Path [] inputPaths;
 
   public PartitionedTableScanNode(int pid) {
     super(pid, NodeType.PARTITIONS_SCAN);
@@ -43,15 +44,22 @@ public class PartitionedTableScanNode extends ScanNode {
     setOutSchema(scanNode.getOutSchema());
     this.qual = scanNode.qual;
     this.targets = scanNode.targets;
-    this.inputPaths = inputPaths;
+    setInputPaths(inputPaths);
 
     if (scanNode.hasAlias()) {
       alias = scanNode.alias;
     }
   }
 
+  public void clearInputPaths() {
+    this.inputPaths = null;
+  }
+
   public void setInputPaths(Path [] paths) {
     this.inputPaths = paths;
+    if (this.inputPaths != null) {
+      Arrays.sort(inputPaths);
+    }
   }
 
   public Path [] getInputPaths() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 00e6d75..4afb383 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -124,14 +124,17 @@ public abstract class Tablespace {
   /**
    * Returns the splits that will serve as input for the scan tasks. The
    * number of splits matches the number of regions in a table.
+   *
    * @param inputSourceId Input source identifier, which can be either relation name or execution block id
    * @param tableDesc The table description for the target data.
+   * @param requireSort The result fragments will be sorted with their paths.
    * @param filterCondition filter condition which can prune splits if possible
    * @return The list of input fragments.
    * @throws java.io.IOException
    */
   public abstract List<Fragment> getSplits(String inputSourceId,
                                            TableDesc tableDesc,
+                                           boolean requireSort,
                                            @Nullable EvalNode filterCondition) throws IOException, TajoException;
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 132ceff..0cf883e 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -557,6 +557,7 @@ public class HBaseTablespace extends Tablespace {
   @Override
   public List<Fragment> getSplits(String inputSourceId,
                                   TableDesc table,
+                                  boolean requireSorted,
                                   @Nullable EvalNode filterCondition) throws IOException, TajoException {
     return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2aa2b91..f3cb9a5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -304,10 +304,12 @@ public class FileTablespace extends Tablespace {
    * Subclasses may override to, e.g., select only files matching a regular
    * expression.
    *
+   * @param requireSort if set, result will be sorted by their paths.
+   * @param dirs input dirs
    * @return array of FileStatus objects
    * @throws IOException if zero items.
    */
-  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+  protected List<FileStatus> listStatus(boolean requireSort, Path... dirs) throws IOException {
     List<FileStatus> result = new ArrayList<>();
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
@@ -342,6 +344,10 @@ public class FileTablespace extends Tablespace {
     if (!errors.isEmpty()) {
       throw new InvalidInputException(errors);
     }
+
+    if (requireSort) {
+      Collections.sort(result);
+    }
     LOG.info("Total input paths to process : " + result.size());
     return result;
   }
@@ -464,7 +470,7 @@ public class FileTablespace extends Tablespace {
    *
    * @throws IOException
    */
-  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
+  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, boolean requireSort, Path... inputs)
       throws IOException {
     // generate splits'
 
@@ -477,7 +483,7 @@ public class FileTablespace extends Tablespace {
       if (fs.isFile(p)) {
         files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
       } else {
-        files.addAll(listStatus(p));
+        files.addAll(listStatus(requireSort, p));
       }
 
       int previousSplitSize = splits.size();
@@ -606,8 +612,9 @@ public class FileTablespace extends Tablespace {
   @Override
   public List<Fragment> getSplits(String inputSourceId,
                                   TableDesc table,
+                                  boolean requireSort,
                                   @Nullable EvalNode filterCondition) throws IOException {
-    return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri()));
+    return getSplits(inputSourceId, table.getMeta(), table.getSchema(), requireSort, new Path(table.getUri()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index c62a01c..1818330 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -127,7 +127,7 @@ public class TestFileSystems {
     appender.close();
     FileStatus fileStatus = fs.getFileStatus(path);
 
-    List<Fragment> splits = sm.getSplits("table", meta, schema, path);
+    List<Fragment> splits = sm.getSplits("table", meta, schema, false, path);
     int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
     assertEquals(splitSize, splits.size());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index dc8781e..ad45d18 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -156,13 +156,13 @@ public class TestFileTablespace {
 
       List<Fragment> splits = Lists.newArrayList();
       // Get FileFragments in partition batch
-      splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+      splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()])));
       assertEquals(testCount, splits.size());
       // -1 is unknown volumeId
       assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
 
       splits.clear();
-      splits.addAll(space.getSplits("data", meta, schema,
+      splits.addAll(space.getSplits("data", meta, schema, false,
           partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
       assertEquals(testCount / 2, splits.size());
       assertEquals(1, splits.get(0).getHosts().length);
@@ -212,7 +212,7 @@ public class TestFileTablespace {
 
       List<Fragment> splits = Lists.newArrayList();
       // Get FileFragments in partition batch
-      splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+      splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()])));
       assertEquals(0, splits.size());
 
       fs.close();
@@ -256,7 +256,7 @@ public class TestFileTablespace {
       TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, conf);
 
       List<Fragment> splits = Lists.newArrayList();
-      splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+      splits.addAll(sm.getSplits("data", meta, schema, false, tablePath));
 
       assertEquals(testCount, splits.size());
       assertEquals(2, splits.get(0).getHosts().length);

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 3e36340..c1423d7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -269,7 +269,7 @@ public class TestStorages {
       assertEquals(0, fileStatus.getLen());
     }
 
-    List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, testDir);
+    List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, false, testDir);
     int tupleCnt = 0;
     for (Fragment fragment : splits) {
       Scanner scanner = sm.getScanner(meta, schema, fragment, schema);

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index fa6cf48..7a630b2 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -122,6 +122,7 @@ public abstract class JdbcTablespace extends Tablespace {
   @Override
   public List<Fragment> getSplits(String inputSourceId,
                                   TableDesc tableDesc,
+                                  boolean requireSorted,
                                   @Nullable EvalNode filterCondition) throws IOException {
     return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString()));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/30a46592/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
index 0d0e15e..875d70e 100644
--- a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
+++ b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java
@@ -74,7 +74,7 @@ public class TestPgSQLJdbcTableSpace {
     Tablespace space = TablespaceManager.getByName("pgsql_cluster");
     MetadataProvider provider = space.getMetadataProvider();
     TableDesc table = provider.getTableDesc(null, "lineitem");
-    List<Fragment> fragments = space.getSplits("lineitem", table, null);
+    List<Fragment> fragments = space.getSplits("lineitem", table, false, null);
     assertNotNull(fragments);
     assertEquals(1, fragments.size());
   }