You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2016/01/27 23:20:32 UTC

tajo git commit: TAJO-1940: Implement HBaseTablespace::getTableVolume() method.

Repository: tajo
Updated Branches:
  refs/heads/master 3cf7f24bc -> 73a43d8b7


TAJO-1940: Implement HBaseTablespace::getTableVolume() method.

Closes #910


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73a43d8b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73a43d8b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73a43d8b

Branch: refs/heads/master
Commit: 73a43d8b7bdba7c3963efd9df87f6e3d3a703cfd
Parents: 3cf7f24
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 27 14:18:21 2016 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 27 14:18:21 2016 -0800

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/engine/query/TestHBaseTable.java       |  65 ++++-
 .../org/apache/tajo/plan/StorageService.java    |   3 +-
 .../plan/rewrite/TableStatUpdateRewriter.java   |   2 +-
 .../org/apache/tajo/storage/Tablespace.java     |   2 +-
 .../apache/tajo/storage/TablespaceManager.java  |   5 +-
 .../tajo/storage/hbase/HBaseTablespace.java     | 282 +++++++++++--------
 .../storage/hbase/RegionSizeCalculator.java     | 150 ++++++++++
 .../org/apache/tajo/storage/FileTablespace.java |   4 +-
 .../tajo/storage/jdbc/JdbcTablespace.java       |   2 +-
 10 files changed, 377 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6bc9635..47beafb 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik)
+
     TAJO-2061: Add description for EXPLAIN statement. (jaehwa)
 
     TAJO-2060: Upgrade geoip-api-java library. (Byunghwa Yun via jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 97feb65..d4712dc 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -210,7 +210,62 @@ public class TestHBaseTable extends QueryTestCaseBase {
     } finally {
       TablespaceManager.addTableSpaceForTest(existing.get());
     }
+  }
 
+  private void putData(HTableInterface htable, int rownum) throws IOException {
+    for (int i = 0; i < rownum; i++) {
+      Put put = new Put(String.valueOf(i).getBytes());
+      put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+      put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+      put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+      put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+      put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+      htable.put(put);
+    }
+  }
+
+  @Test
+  public void testGetTableVolume() throws Exception {
+    final String tableName = "external_hbase_table";
+
+    Optional<Tablespace> existing = TablespaceManager.removeTablespaceForTest("cluster1");
+    assertTrue(existing.isPresent());
+
+    try {
+      HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+      hTableDesc.addFamily(new HColumnDescriptor("col1"));
+      hTableDesc.addFamily(new HColumnDescriptor("col2"));
+      hTableDesc.addFamily(new HColumnDescriptor("col3"));
+      testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+      String sql = String.format(
+          "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+              "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " +
+              "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+      executeString(sql).close();
+
+      assertTableExists("external_hbase_mapped_table");
+
+      HBaseTablespace tablespace = (HBaseTablespace)existing.get();
+      HConnection hconn = ((HBaseTablespace)existing.get()).getConnection();
+
+      try (HTableInterface htable = hconn.getTable(tableName)) {
+        htable.setAutoFlushTo(true);
+        putData(htable, 4000);
+      }
+      hconn.close();
+
+      Thread.sleep(3000); // sleep here for up-to-date region server load. It may not be a problem in real cluster.
+
+      TableDesc createdTable = client.getTableDesc("external_hbase_mapped_table");
+      assertNotNull(tablespace);
+      long volume = tablespace.getTableVolume(createdTable, Optional.empty());
+      assertTrue(volume > 0 || volume == -1);
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+
+    } finally {
+      TablespaceManager.addTableSpaceForTest(existing.get());
+    }
   }
 
   @Test
@@ -236,15 +291,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
       HConnection hconn = ((HBaseTablespace)existing.get()).getConnection();
 
       try (HTableInterface htable = hconn.getTable("external_hbase_table")) {
-        for (int i = 0; i < 100; i++) {
-          Put put = new Put(String.valueOf(i).getBytes());
-          put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
-          put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
-          put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
-          put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
-          put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
-          htable.put(put);
-        }
+        putData(htable, 100);
 
         ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
         assertResultSet(res);

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
index 10d11f0..cbb7387 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.plan;
 
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 
@@ -40,5 +41,5 @@ public interface StorageService {
    */
   URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
 
-  long getTableVolumn(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+  long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
index 3683f60..238980b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
@@ -110,7 +110,7 @@ public class TableStatUpdateRewriter implements LogicalPlanRewriteRule {
     private long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
       try {
         if (table.getStats() != null) {
-          return storage.getTableVolumn(table.getUri(), filter);
+          return storage.getTableVolumn(table, filter);
         }
       } catch (UnsupportedException t) {
         LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()");

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 6c97754..00e6d75 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -99,7 +99,7 @@ public abstract class Tablespace {
     return name + "=" + uri.toString();
   }
 
-  public abstract long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+  public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
 
   /**
    * if {@link StorageProperty#isArbitraryPathAllowed} is true,

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
index 12e283f..88410bb 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.MetadataProvider;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UndefinedTablespaceException;
@@ -435,9 +436,9 @@ public class TablespaceManager implements StorageService {
   }
 
   @Override
-  public long getTableVolumn(URI tableUri, Optional<EvalNode> filter)
+  public long getTableVolumn(TableDesc table, Optional<EvalNode> filter)
       throws UnsupportedException {
-    return get(tableUri).getTableVolume(tableUri, filter);
+    return get(table.getUri()).getTableVolume(table, filter);
   }
 
   public static Iterable<Tablespace> getAllTablespaces() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 1b81531..132ceff 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.hbase;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import net.minidev.json.JSONObject;
 import org.apache.commons.logging.Log;
@@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
@@ -54,7 +54,10 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.verifier.SyntaxErrorUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
 
 import javax.annotation.Nullable;
 import java.io.BufferedReader;
@@ -101,8 +104,19 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
-  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
-    throw new UnsupportedException();
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
+    long totalVolume;
+    try {
+      totalVolume = getRawSplits("", table, filter.orElse(null)).stream()
+          .map(f -> f.getLength())
+          .filter(size -> size > 0) // eliminate unknown sizes (-1)
+          .reduce(0L, Long::sum);
+    } catch (TajoException e) {
+      throw new TajoRuntimeException(e);
+    } catch (Throwable ioe) {
+      throw new TajoInternalError(ioe);
+    }
+    return totalVolume;
   }
 
   @Override
@@ -143,8 +157,8 @@ public class HBaseTablespace extends Tablespace {
     ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getPropertySet());
     int numRowKeys = 0;
     boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-    for (boolean isRowKeyMapping : isRowKeyMappings) {
-      if (isRowKeyMapping) {
+    for (int i = 0; i < isRowKeyMappings.length; i++) {
+      if (isRowKeyMappings[i]) {
         numRowKeys++;
       }
     }
@@ -412,143 +426,165 @@ public class HBaseTablespace extends Tablespace {
     return new Column[]{indexColumn};
   }
 
-  @Override
-  public List<Fragment> getSplits(String inputSourceId,
-                                  TableDesc tableDesc,
-                                  @Nullable EvalNode filterCondition)
-      throws IOException, TajoException {
+  private Pair<List<byte []>, List<byte []>> getSelectedKeyRange(
+      ColumnMapping columnMap,
+      List<IndexPredication> predicates) {
 
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getPropertySet());
+    final List<byte[]> startRows;
+    final List<byte[]> stopRows;
 
-    List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition);
-    HTable htable = null;
+    if (predicates != null && !predicates.isEmpty()) {
+      // indexPredications is Disjunctive set
+      startRows = predicates.stream()
+          .map(x -> {
+            if (x.getStartValue() != null) {
+              return serialize(columnMap, x, x.getStartValue());
+            } else {
+              return HConstants.EMPTY_START_ROW;
+            }
+          })
+          .collect(Collectors.toList());
 
-    try {
-      htable = new HTable(hbaseConf, tableDesc.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY));
-      RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
-
-      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
-      if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-        HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-        if (null == regLoc) {
-          throw new IOException("Expecting at least one region.");
-        }
-        List<Fragment> fragments = new ArrayList<>(1);
-        HBaseFragment fragment = new HBaseFragment(
-            tableDesc.getUri(),
-            inputSourceId, htable.getName().getNameAsString(),
-            HConstants.EMPTY_BYTE_ARRAY,
-            HConstants.EMPTY_BYTE_ARRAY,
-            regLoc.getHostname());
-        long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
-        if (regionSize == 0) {
-          fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
-        } else {
-          fragment.setLength(regionSize);
-        }
-        fragments.add(fragment);
-        return fragments;
-      }
+      stopRows = predicates.stream()
+          .map(x -> {
+            if (x.getStopValue() != null) {
+              return serialize(columnMap, x, x.getStopValue());
+            } else {
+              return HConstants.EMPTY_START_ROW;
+            }
+          })
+          .collect(Collectors.toList());
 
-      final List<byte[]> startRows;
-      final List<byte[]> stopRows;
+    } else {
+      startRows = EMPTY_START_ROW_KEY;
+      stopRows  = EMPTY_END_ROW_KEY;
+    }
 
-      if (indexPredications != null && !indexPredications.isEmpty()) {
-        // indexPredications is Disjunctive set
-        startRows = indexPredications.stream()
-            .map(x -> {
-                if (x.getStartValue() != null) {
-                  return serialize(columnMapping, x, x.getStartValue());
-                } else {
-                  return HConstants.EMPTY_START_ROW;
-                }
-              })
-            .collect(Collectors.toList());
-
-        stopRows = indexPredications.stream()
-            .map(x -> {
-              if (x.getStopValue() != null) {
-                return serialize(columnMapping, x, x.getStopValue());
-              } else {
-                return HConstants.EMPTY_START_ROW;
-              }
-            })
-            .collect(Collectors.toList());
+    return new Pair(startRows, stopRows);
+  }
 
-      } else {
-        startRows = EMPTY_START_ROW_KEY;
-        stopRows  = EMPTY_END_ROW_KEY;
+  private boolean isEmptyRegion(org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange) {
+    return tableRange == null || tableRange.getFirst() == null || tableRange.getFirst().length == 0;
+  }
+
+  private long getRegionSize(RegionSizeCalculator calculator, byte [] regionName) {
+    long regionSize = calculator.getRegionSize(regionName);
+    if (regionSize == 0) {
+      return TajoConstants.UNKNOWN_LENGTH;
+    } else {
+      return regionSize;
+    }
+  }
+
+  private List<HBaseFragment> createEmptyFragment(TableDesc table, String sourceId, HTable htable,
+                                                  RegionSizeCalculator sizeCalculator) throws IOException {
+    HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+    if (null == regLoc) {
+      throw new IOException("Expecting at least one region.");
+    }
+
+    HBaseFragment fragment = new HBaseFragment(
+        table.getUri(),
+        sourceId, htable.getName().getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY,
+        HConstants.EMPTY_BYTE_ARRAY,
+        regLoc.getHostname());
+
+    fragment.setLength(getRegionSize(sizeCalculator, regLoc.getRegionInfo().getRegionName()));
+    return ImmutableList.of(fragment);
+  }
+
+  private Collection<HBaseFragment> convertRangeToFragment(
+      TableDesc table, String inputSourceId, HTable htable, RegionSizeCalculator sizeCalculator,
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange,
+      Pair<List<byte[]>, List<byte[]>> selectedRange) throws IOException {
+
+    final Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
+
+    for (int i = 0; i < tableRange.getFirst().length; i++) {
+      HRegionLocation location = htable.getRegionLocation(tableRange.getFirst()[i], false);
+      if (location == null) {
+        throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(tableRange.getFirst()[i]));
       }
 
-      // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext)
-      // region startkey -> HBaseFragment
-      Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
-      for (int i = 0; i < keys.getFirst().length; i++) {
-        HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
-        if (null == location) {
-          throw new IOException("Can't find the region of the key: " +  Bytes.toStringBinary(keys.getFirst()[i]));
-        }
+      final byte[] regionStartKey = tableRange.getFirst()[i];
+      final byte[] regionStopKey = tableRange.getSecond()[i];
+
+      int startRowsSize = selectedRange.getFirst().size();
+      for (int j = 0; j < startRowsSize; j++) {
+        byte[] startRow = selectedRange.getFirst().get(j);
+        byte[] stopRow = selectedRange.getSecond().get(j);
+        // determine if the given start an stop key fall into the region
+        if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
+            && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+          final byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
+              regionStartKey : startRow;
+
+          final byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
+              regionStopKey.length > 0 ? regionStopKey : stopRow;
+
+          if (fragmentMap.containsKey(regionStartKey)) {
+            final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
+            if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+              prevFragment.setStartRow(fragmentStart);
+            }
+            if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+              prevFragment.setStopRow(fragmentStop);
+            }
+          } else {
 
-        byte[] regionStartKey = keys.getFirst()[i];
-        byte[] regionStopKey = keys.getSecond()[i];
-
-        int startRowsSize = startRows.size();
-        for (int j = 0; j < startRowsSize; j++) {
-          byte[] startRow = startRows.get(j);
-          byte[] stopRow = stopRows.get(j);
-          // determine if the given start an stop key fall into the region
-          if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
-              && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
-            byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
-                regionStartKey : startRow;
-
-            byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
-                regionStopKey.length > 0 ? regionStopKey : stopRow;
-
-            if (fragmentMap.containsKey(regionStartKey)) {
-              HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
-              if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
-                prevFragment.setStartRow(fragmentStart);
-              }
-              if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
-                prevFragment.setStopRow(fragmentStop);
-              }
-            } else {
-              byte[] regionName = location.getRegionInfo().getRegionName();
-              long regionSize = sizeCalculator.getRegionSize(regionName);
-
-              HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
-                  inputSourceId,
-                  htable.getName().getNameAsString(),
-                  fragmentStart,
-                  fragmentStop,
-                  location.getHostname());
-              if (regionSize == 0) {
-                fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
-              } else {
-                fragment.setLength(regionSize);
-              }
+            final HBaseFragment fragment = new HBaseFragment(table.getUri(),
+                inputSourceId,
+                htable.getName().getNameAsString(),
+                fragmentStart,
+                fragmentStop,
+                location.getHostname());
 
-              fragmentMap.put(regionStartKey, fragment);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
-              }
+            fragment.setLength(getRegionSize(sizeCalculator, location.getRegionInfo().getRegionName()));
+            fragmentMap.put(regionStartKey, fragment);
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
             }
           }
         }
       }
+    }
+
+    return fragmentMap.values();
+  }
 
-      List<HBaseFragment> fragments = new ArrayList<>(fragmentMap.values());
+  @Override
+  public List<Fragment> getSplits(String inputSourceId,
+                                  TableDesc table,
+                                  @Nullable EvalNode filterCondition) throws IOException, TajoException {
+    return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition);
+  }
+
+  private List<HBaseFragment> getRawSplits(String inputSourceId,
+                                           TableDesc table,
+                                           @Nullable EvalNode filterCondition) throws IOException, TajoException {
+    final ColumnMapping columnMapping = new ColumnMapping(table.getSchema(), table.getMeta().getPropertySet());
+
+    try (final HTable htable = new HTable(hbaseConf, table.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY))) {
+      final RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
+      final org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange = htable.getStartEndKeys();
+      if (isEmptyRegion(tableRange)) {
+        return createEmptyFragment(table, inputSourceId, htable, sizeCalculator);
+      }
+
+      final Pair<List<byte []>, List<byte []>> selectedRange = getSelectedKeyRange(
+          columnMapping,
+          getIndexPredications(columnMapping, table, filterCondition));
+
+      // region startkey -> HBaseFragment
+      List<HBaseFragment> fragments = new ArrayList<>(convertRangeToFragment(table, inputSourceId, htable, sizeCalculator, tableRange, selectedRange));
       Collections.sort(fragments);
       if (!fragments.isEmpty()) {
         fragments.get(fragments.size() - 1).setLast(true);
       }
 
-      return (ArrayList<Fragment>) (ArrayList) fragments;
-    } finally {
-      if (htable != null) {
-        htable.close();
-      }
+      return ImmutableList.copyOf(fragments);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
new file mode 100644
index 0000000..806320d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is borrowed from Hbase, but it is modified in order to recognize
+ * the mem store size.
+ *
+ * Computes size of each region for given table and given column families.
+ * The value is used by MapReduce for better scheduling.
+ * */
+@InterfaceStability.Evolving
+@InterfaceAudience.Private
+public class RegionSizeCalculator {
+
+  private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class);
+
+  /**
+   * Maps each region to its size in bytes.
+   * */
+  private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+  static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+  /**
+   * Computes size of each region for table and given column families.
+   *
+   * @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead.
+   */
+  @Deprecated
+  public RegionSizeCalculator(HTable table) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+    try {
+      init(table.getRegionLocator(), admin);
+    } finally {
+      admin.close();
+    }
+  }
+
+  /**
+   * Computes size of each region for table and given column families.
+   * */
+  public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException {
+    init(regionLocator, admin);
+  }
+
+  private void init(RegionLocator regionLocator, Admin admin)
+      throws IOException {
+    if (!enabled(admin.getConfiguration())) {
+      LOG.info("Region size calculation disabled.");
+      return;
+    }
+
+    LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\".");
+
+    //get regions for table
+    List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
+    Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    for (HRegionLocation regionInfo : tableRegionInfos) {
+      tableRegions.add(regionInfo.getRegionInfo().getRegionName());
+    }
+
+    ClusterStatus clusterStatus = admin.getClusterStatus();
+    Collection<ServerName> servers = clusterStatus.getServers();
+    final long megaByte = 1024L * 1024L;
+
+    //iterate all cluster regions, filter regions from our table and compute their size
+    for (ServerName serverName: servers) {
+      ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+      for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
+        byte[] regionId = regionLoad.getName();
+
+        if (tableRegions.contains(regionId)) {
+
+          long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte;
+          sizeMap.put(regionId, regionSizeBytes);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
+          }
+        }
+      }
+    }
+    LOG.debug("Region sizes calculated");
+  }
+
+  boolean enabled(Configuration configuration) {
+    return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+  }
+
+  /**
+   * Returns size of given region in bytes. Returns 0 if region was not found.
+   * */
+  public long getRegionSize(byte[] regionId) {
+    Long size = sizeMap.get(regionId);
+    if (size == null) {
+      LOG.debug("Unknown region:" + Arrays.toString(regionId));
+      return 0;
+    } else {
+      return size;
+    }
+  }
+
+  public Map<byte[], Long> getRegionSizeMap() {
+    return Collections.unmodifiableMap(sizeMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index e50d587..35504af 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -125,8 +125,8 @@ public class FileTablespace extends Tablespace {
   }
 
   @Override
-  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
-    Path path = new Path(uri);
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
+    Path path = new Path(table.getUri());
     ContentSummary summary;
     try {
       summary = fs.getContentSummary(path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index 1f7f299..fa6cf48 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -110,7 +110,7 @@ public abstract class JdbcTablespace extends Tablespace {
   }
 
   @Override
-  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
     throw new UnsupportedException();
   }