You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2016/01/27 23:20:32 UTC
tajo git commit: TAJO-1940: Implement
HBaseTablespace::getTableVolume() method.
Repository: tajo
Updated Branches:
refs/heads/master 3cf7f24bc -> 73a43d8b7
TAJO-1940: Implement HBaseTablespace::getTableVolume() method.
Closes #910
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73a43d8b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73a43d8b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73a43d8b
Branch: refs/heads/master
Commit: 73a43d8b7bdba7c3963efd9df87f6e3d3a703cfd
Parents: 3cf7f24
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 27 14:18:21 2016 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 27 14:18:21 2016 -0800
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/engine/query/TestHBaseTable.java | 65 ++++-
.../org/apache/tajo/plan/StorageService.java | 3 +-
.../plan/rewrite/TableStatUpdateRewriter.java | 2 +-
.../org/apache/tajo/storage/Tablespace.java | 2 +-
.../apache/tajo/storage/TablespaceManager.java | 5 +-
.../tajo/storage/hbase/HBaseTablespace.java | 282 +++++++++++--------
.../storage/hbase/RegionSizeCalculator.java | 150 ++++++++++
.../org/apache/tajo/storage/FileTablespace.java | 4 +-
.../tajo/storage/jdbc/JdbcTablespace.java | 2 +-
10 files changed, 377 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6bc9635..47beafb 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik)
+
TAJO-2061: Add description for EXPLAIN statement. (jaehwa)
TAJO-2060: Upgrade geoip-api-java library. (Byunghwa Yun via jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 97feb65..d4712dc 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -210,7 +210,62 @@ public class TestHBaseTable extends QueryTestCaseBase {
} finally {
TablespaceManager.addTableSpaceForTest(existing.get());
}
+ }
+ private void putData(HTableInterface htable, int rownum) throws IOException {
+ for (int i = 0; i < rownum; i++) {
+ Put put = new Put(String.valueOf(i).getBytes());
+ put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+ put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+ put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+ put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+ put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+ htable.put(put);
+ }
+ }
+
+ @Test
+ public void testGetTableVolume() throws Exception {
+ final String tableName = "external_hbase_table";
+
+ Optional<Tablespace> existing = TablespaceManager.removeTablespaceForTest("cluster1");
+ assertTrue(existing.isPresent());
+
+ try {
+ HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ hTableDesc.addFamily(new HColumnDescriptor("col1"));
+ hTableDesc.addFamily(new HColumnDescriptor("col2"));
+ hTableDesc.addFamily(new HColumnDescriptor("col3"));
+ testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+ String sql = String.format(
+ "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+ "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " +
+ "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+ executeString(sql).close();
+
+ assertTableExists("external_hbase_mapped_table");
+
+ HBaseTablespace tablespace = (HBaseTablespace)existing.get();
+ HConnection hconn = ((HBaseTablespace)existing.get()).getConnection();
+
+ try (HTableInterface htable = hconn.getTable(tableName)) {
+ htable.setAutoFlushTo(true);
+ putData(htable, 4000);
+ }
+ hconn.close();
+
+ Thread.sleep(3000); // sleep here for up-to-date region server load. It may not be a problem in real cluster.
+
+ TableDesc createdTable = client.getTableDesc("external_hbase_mapped_table");
+ assertNotNull(tablespace);
+ long volume = tablespace.getTableVolume(createdTable, Optional.empty());
+ assertTrue(volume > 0 || volume == -1);
+ executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+
+ } finally {
+ TablespaceManager.addTableSpaceForTest(existing.get());
+ }
}
@Test
@@ -236,15 +291,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
HConnection hconn = ((HBaseTablespace)existing.get()).getConnection();
try (HTableInterface htable = hconn.getTable("external_hbase_table")) {
- for (int i = 0; i < 100; i++) {
- Put put = new Put(String.valueOf(i).getBytes());
- put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
- put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
- put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
- put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
- put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
- htable.put(put);
- }
+ putData(htable, 100);
ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
assertResultSet(res);
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
index 10d11f0..cbb7387 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java
@@ -18,6 +18,7 @@
package org.apache.tajo.plan;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
@@ -40,5 +41,5 @@ public interface StorageService {
*/
URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
- long getTableVolumn(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+ long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
index 3683f60..238980b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java
@@ -110,7 +110,7 @@ public class TableStatUpdateRewriter implements LogicalPlanRewriteRule {
private long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
try {
if (table.getStats() != null) {
- return storage.getTableVolumn(table.getUri(), filter);
+ return storage.getTableVolumn(table, filter);
}
} catch (UnsupportedException t) {
LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()");
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 6c97754..00e6d75 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -99,7 +99,7 @@ public abstract class Tablespace {
return name + "=" + uri.toString();
}
- public abstract long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException;
+ public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
/**
* if {@link StorageProperty#isArbitraryPathAllowed} is true,
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
index 12e283f..88410bb 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.MetadataProvider;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UndefinedTablespaceException;
@@ -435,9 +436,9 @@ public class TablespaceManager implements StorageService {
}
@Override
- public long getTableVolumn(URI tableUri, Optional<EvalNode> filter)
+ public long getTableVolumn(TableDesc table, Optional<EvalNode> filter)
throws UnsupportedException {
- return get(tableUri).getTableVolume(tableUri, filter);
+ return get(table.getUri()).getTableVolume(table, filter);
}
public static Iterable<Tablespace> getAllTablespaces() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 1b81531..132ceff 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -18,6 +18,7 @@
package org.apache.tajo.storage.hbase;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import net.minidev.json.JSONObject;
import org.apache.commons.logging.Log;
@@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
@@ -54,7 +54,10 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
import org.apache.tajo.plan.verifier.SyntaxErrorUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
import javax.annotation.Nullable;
import java.io.BufferedReader;
@@ -101,8 +104,19 @@ public class HBaseTablespace extends Tablespace {
}
@Override
- public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
- throw new UnsupportedException();
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
+ long totalVolume;
+ try {
+ totalVolume = getRawSplits("", table, filter.orElse(null)).stream()
+ .map(f -> f.getLength())
+ .filter(size -> size > 0) // eliminate unknown sizes (-1)
+ .reduce(0L, Long::sum);
+ } catch (TajoException e) {
+ throw new TajoRuntimeException(e);
+ } catch (Throwable ioe) {
+ throw new TajoInternalError(ioe);
+ }
+ return totalVolume;
}
@Override
@@ -143,8 +157,8 @@ public class HBaseTablespace extends Tablespace {
ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getPropertySet());
int numRowKeys = 0;
boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
- for (boolean isRowKeyMapping : isRowKeyMappings) {
- if (isRowKeyMapping) {
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (isRowKeyMappings[i]) {
numRowKeys++;
}
}
@@ -412,143 +426,165 @@ public class HBaseTablespace extends Tablespace {
return new Column[]{indexColumn};
}
- @Override
- public List<Fragment> getSplits(String inputSourceId,
- TableDesc tableDesc,
- @Nullable EvalNode filterCondition)
- throws IOException, TajoException {
+ private Pair<List<byte []>, List<byte []>> getSelectedKeyRange(
+ ColumnMapping columnMap,
+ List<IndexPredication> predicates) {
- ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getPropertySet());
+ final List<byte[]> startRows;
+ final List<byte[]> stopRows;
- List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition);
- HTable htable = null;
+ if (predicates != null && !predicates.isEmpty()) {
+ // indexPredications is Disjunctive set
+ startRows = predicates.stream()
+ .map(x -> {
+ if (x.getStartValue() != null) {
+ return serialize(columnMap, x, x.getStartValue());
+ } else {
+ return HConstants.EMPTY_START_ROW;
+ }
+ })
+ .collect(Collectors.toList());
- try {
- htable = new HTable(hbaseConf, tableDesc.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY));
- RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
-
- org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
- if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
- HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
- if (null == regLoc) {
- throw new IOException("Expecting at least one region.");
- }
- List<Fragment> fragments = new ArrayList<>(1);
- HBaseFragment fragment = new HBaseFragment(
- tableDesc.getUri(),
- inputSourceId, htable.getName().getNameAsString(),
- HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY,
- regLoc.getHostname());
- long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
- if (regionSize == 0) {
- fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
- } else {
- fragment.setLength(regionSize);
- }
- fragments.add(fragment);
- return fragments;
- }
+ stopRows = predicates.stream()
+ .map(x -> {
+ if (x.getStopValue() != null) {
+ return serialize(columnMap, x, x.getStopValue());
+ } else {
+ return HConstants.EMPTY_START_ROW;
+ }
+ })
+ .collect(Collectors.toList());
- final List<byte[]> startRows;
- final List<byte[]> stopRows;
+ } else {
+ startRows = EMPTY_START_ROW_KEY;
+ stopRows = EMPTY_END_ROW_KEY;
+ }
- if (indexPredications != null && !indexPredications.isEmpty()) {
- // indexPredications is Disjunctive set
- startRows = indexPredications.stream()
- .map(x -> {
- if (x.getStartValue() != null) {
- return serialize(columnMapping, x, x.getStartValue());
- } else {
- return HConstants.EMPTY_START_ROW;
- }
- })
- .collect(Collectors.toList());
-
- stopRows = indexPredications.stream()
- .map(x -> {
- if (x.getStopValue() != null) {
- return serialize(columnMapping, x, x.getStopValue());
- } else {
- return HConstants.EMPTY_START_ROW;
- }
- })
- .collect(Collectors.toList());
+ return new Pair(startRows, stopRows);
+ }
- } else {
- startRows = EMPTY_START_ROW_KEY;
- stopRows = EMPTY_END_ROW_KEY;
+ private boolean isEmptyRegion(org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange) {
+ return tableRange == null || tableRange.getFirst() == null || tableRange.getFirst().length == 0;
+ }
+
+ private long getRegionSize(RegionSizeCalculator calculator, byte [] regionName) {
+ long regionSize = calculator.getRegionSize(regionName);
+ if (regionSize == 0) {
+ return TajoConstants.UNKNOWN_LENGTH;
+ } else {
+ return regionSize;
+ }
+ }
+
+ private List<HBaseFragment> createEmptyFragment(TableDesc table, String sourceId, HTable htable,
+ RegionSizeCalculator sizeCalculator) throws IOException {
+ HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+
+ HBaseFragment fragment = new HBaseFragment(
+ table.getUri(),
+ sourceId, htable.getName().getNameAsString(),
+ HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY,
+ regLoc.getHostname());
+
+ fragment.setLength(getRegionSize(sizeCalculator, regLoc.getRegionInfo().getRegionName()));
+ return ImmutableList.of(fragment);
+ }
+
+ private Collection<HBaseFragment> convertRangeToFragment(
+ TableDesc table, String inputSourceId, HTable htable, RegionSizeCalculator sizeCalculator,
+ org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange,
+ Pair<List<byte[]>, List<byte[]>> selectedRange) throws IOException {
+
+ final Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
+
+ for (int i = 0; i < tableRange.getFirst().length; i++) {
+ HRegionLocation location = htable.getRegionLocation(tableRange.getFirst()[i], false);
+ if (location == null) {
+ throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(tableRange.getFirst()[i]));
}
- // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext)
- // region startkey -> HBaseFragment
- Map<byte[], HBaseFragment> fragmentMap = new HashMap<>();
- for (int i = 0; i < keys.getFirst().length; i++) {
- HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
- if (null == location) {
- throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(keys.getFirst()[i]));
- }
+ final byte[] regionStartKey = tableRange.getFirst()[i];
+ final byte[] regionStopKey = tableRange.getSecond()[i];
+
+ int startRowsSize = selectedRange.getFirst().size();
+ for (int j = 0; j < startRowsSize; j++) {
+ byte[] startRow = selectedRange.getFirst().get(j);
+ byte[] stopRow = selectedRange.getSecond().get(j);
+ // determine if the given start an stop key fall into the region
+ if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
+ && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+ final byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
+ regionStartKey : startRow;
+
+ final byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
+ regionStopKey.length > 0 ? regionStopKey : stopRow;
+
+ if (fragmentMap.containsKey(regionStartKey)) {
+ final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
+ if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+ prevFragment.setStartRow(fragmentStart);
+ }
+ if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+ prevFragment.setStopRow(fragmentStop);
+ }
+ } else {
- byte[] regionStartKey = keys.getFirst()[i];
- byte[] regionStopKey = keys.getSecond()[i];
-
- int startRowsSize = startRows.size();
- for (int j = 0; j < startRowsSize; j++) {
- byte[] startRow = startRows.get(j);
- byte[] stopRow = stopRows.get(j);
- // determine if the given start an stop key fall into the region
- if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
- && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
- byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
- regionStartKey : startRow;
-
- byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
- regionStopKey.length > 0 ? regionStopKey : stopRow;
-
- if (fragmentMap.containsKey(regionStartKey)) {
- HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
- if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
- prevFragment.setStartRow(fragmentStart);
- }
- if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
- prevFragment.setStopRow(fragmentStop);
- }
- } else {
- byte[] regionName = location.getRegionInfo().getRegionName();
- long regionSize = sizeCalculator.getRegionSize(regionName);
-
- HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
- inputSourceId,
- htable.getName().getNameAsString(),
- fragmentStart,
- fragmentStop,
- location.getHostname());
- if (regionSize == 0) {
- fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
- } else {
- fragment.setLength(regionSize);
- }
+ final HBaseFragment fragment = new HBaseFragment(table.getUri(),
+ inputSourceId,
+ htable.getName().getNameAsString(),
+ fragmentStart,
+ fragmentStop,
+ location.getHostname());
- fragmentMap.put(regionStartKey, fragment);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
- }
+ fragment.setLength(getRegionSize(sizeCalculator, location.getRegionInfo().getRegionName()));
+ fragmentMap.put(regionStartKey, fragment);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
}
}
}
}
+ }
+
+ return fragmentMap.values();
+ }
- List<HBaseFragment> fragments = new ArrayList<>(fragmentMap.values());
+ @Override
+ public List<Fragment> getSplits(String inputSourceId,
+ TableDesc table,
+ @Nullable EvalNode filterCondition) throws IOException, TajoException {
+ return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition);
+ }
+
+ private List<HBaseFragment> getRawSplits(String inputSourceId,
+ TableDesc table,
+ @Nullable EvalNode filterCondition) throws IOException, TajoException {
+ final ColumnMapping columnMapping = new ColumnMapping(table.getSchema(), table.getMeta().getPropertySet());
+
+ try (final HTable htable = new HTable(hbaseConf, table.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY))) {
+ final RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable);
+ final org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange = htable.getStartEndKeys();
+ if (isEmptyRegion(tableRange)) {
+ return createEmptyFragment(table, inputSourceId, htable, sizeCalculator);
+ }
+
+ final Pair<List<byte []>, List<byte []>> selectedRange = getSelectedKeyRange(
+ columnMapping,
+ getIndexPredications(columnMapping, table, filterCondition));
+
+ // region startkey -> HBaseFragment
+ List<HBaseFragment> fragments = new ArrayList<>(convertRangeToFragment(table, inputSourceId, htable, sizeCalculator, tableRange, selectedRange));
Collections.sort(fragments);
if (!fragments.isEmpty()) {
fragments.get(fragments.size() - 1).setLast(true);
}
- return (ArrayList<Fragment>) (ArrayList) fragments;
- } finally {
- if (htable != null) {
- htable.close();
- }
+ return ImmutableList.copyOf(fragments);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
new file mode 100644
index 0000000..806320d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is borrowed from Hbase, but it is modified in order to recognize
+ * the mem store size.
+ *
+ * Computes size of each region for given table and given column families.
+ * The value is used by MapReduce for better scheduling.
+ * */
+@InterfaceStability.Evolving
+@InterfaceAudience.Private
+public class RegionSizeCalculator {
+
+ private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class);
+
+ /**
+ * Maps each region to its size in bytes.
+ * */
+ private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+ static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+ /**
+ * Computes size of each region for table and given column families.
+ *
+ * @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead.
+ */
+ @Deprecated
+ public RegionSizeCalculator(HTable table) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
+ try {
+ init(table.getRegionLocator(), admin);
+ } finally {
+ admin.close();
+ }
+ }
+
+ /**
+ * Computes size of each region for table and given column families.
+ * */
+ public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException {
+ init(regionLocator, admin);
+ }
+
+ private void init(RegionLocator regionLocator, Admin admin)
+ throws IOException {
+ if (!enabled(admin.getConfiguration())) {
+ LOG.info("Region size calculation disabled.");
+ return;
+ }
+
+ LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\".");
+
+ //get regions for table
+ List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
+ Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (HRegionLocation regionInfo : tableRegionInfos) {
+ tableRegions.add(regionInfo.getRegionInfo().getRegionName());
+ }
+
+ ClusterStatus clusterStatus = admin.getClusterStatus();
+ Collection<ServerName> servers = clusterStatus.getServers();
+ final long megaByte = 1024L * 1024L;
+
+ //iterate all cluster regions, filter regions from our table and compute their size
+ for (ServerName serverName: servers) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+ for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+
+ if (tableRegions.contains(regionId)) {
+
+ long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte;
+ sizeMap.put(regionId, regionSizeBytes);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
+ }
+ }
+ }
+ }
+ LOG.debug("Region sizes calculated");
+ }
+
+ boolean enabled(Configuration configuration) {
+ return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+ }
+
+ /**
+ * Returns size of given region in bytes. Returns 0 if region was not found.
+ * */
+ public long getRegionSize(byte[] regionId) {
+ Long size = sizeMap.get(regionId);
+ if (size == null) {
+ LOG.debug("Unknown region:" + Arrays.toString(regionId));
+ return 0;
+ } else {
+ return size;
+ }
+ }
+
+ public Map<byte[], Long> getRegionSizeMap() {
+ return Collections.unmodifiableMap(sizeMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index e50d587..35504af 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -125,8 +125,8 @@ public class FileTablespace extends Tablespace {
}
@Override
- public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
- Path path = new Path(uri);
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
+ Path path = new Path(table.getUri());
ContentSummary summary;
try {
summary = fs.getContentSummary(path);
http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index 1f7f299..fa6cf48 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -110,7 +110,7 @@ public abstract class JdbcTablespace extends Tablespace {
}
@Override
- public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException {
throw new UnsupportedException();
}