You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/12/14 07:21:48 UTC
tajo git commit: TAJO-1990: Refine some parts in HBaseTablespace.
Repository: tajo
Updated Branches:
refs/heads/master 252c311ea -> 2aefa0dad
TAJO-1990: Refine some parts in HBaseTablespace.
Closes #880
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2aefa0da
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2aefa0da
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2aefa0da
Branch: refs/heads/master
Commit: 2aefa0dada15fbfe1f2c45534a160521f00c0758
Parents: 252c311
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sun Dec 13 22:18:57 2015 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sun Dec 13 22:18:57 2015 -0800
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/storage/hbase/HBaseTablespace.java | 92 ++++++++++----------
.../tajo/storage/hbase/IndexPredication.java | 31 +++----
3 files changed, 60 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9b7842f..c297940 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-1990: Refine some parts in HBaseTablespace. (hyunsik)
+
TAJO-2005: Add TableStatUpdateRewriter. (hyunsik)
TAJO-1948: Change GroupbyNode::setAggFunctions and getAggFunctions to set
http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index f06cc67..c1e8a2d 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -61,6 +61,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;
+import java.util.stream.Collectors;
/**
* Tablespace for HBase table.
@@ -72,6 +73,8 @@ public class HBaseTablespace extends Tablespace {
new StorageProperty("hbase", false, true, false, false);
public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
+ public static final List<byte []> EMPTY_START_ROW_KEY = TUtil.newList(HConstants.EMPTY_START_ROW);
+ public static final List<byte []> EMPTY_END_ROW_KEY = TUtil.newList(HConstants.EMPTY_END_ROW);
private Configuration hbaseConf;
@@ -266,6 +269,7 @@ public class HBaseTablespace extends Tablespace {
// If there is many split keys, Tajo allows to define in the file.
Path path = new Path(splitRowKeysFile);
FileSystem fs = path.getFileSystem(conf);
+
if (!fs.exists(path)) {
throw new MissingTablePropertyException("hbase.split.rowkeys.file=" + path.toString() + " not exists.",
hbaseTableName);
@@ -273,6 +277,7 @@ public class HBaseTablespace extends Tablespace {
SortedSet<String> splitKeySet = new TreeSet<>();
BufferedReader reader = null;
+
try {
reader = new BufferedReader(new InputStreamReader(fs.open(path)));
String line = null;
@@ -438,32 +443,34 @@ public class HBaseTablespace extends Tablespace {
return fragments;
}
- List<byte[]> startRows;
- List<byte[]> stopRows;
+ final List<byte[]> startRows;
+ final List<byte[]> stopRows;
if (indexPredications != null && !indexPredications.isEmpty()) {
// indexPredications is Disjunctive set
- startRows = new ArrayList<>();
- stopRows = new ArrayList<>();
- for (IndexPredication indexPredication: indexPredications) {
- byte[] startRow;
- byte[] stopRow;
- if (indexPredication.getStartValue() != null) {
- startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue());
- } else {
- startRow = HConstants.EMPTY_START_ROW;
- }
- if (indexPredication.getStopValue() != null) {
- stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue());
- } else {
- stopRow = HConstants.EMPTY_END_ROW;
- }
- startRows.add(startRow);
- stopRows.add(stopRow);
- }
+ startRows = indexPredications.stream()
+ .map(x -> {
+ if (x.getStartValue() != null) {
+ return serialize(columnMapping, x, x.getStartValue());
+ } else {
+ return HConstants.EMPTY_START_ROW;
+ }
+ })
+ .collect(Collectors.toList());
+
+ stopRows = indexPredications.stream()
+ .map(x -> {
+ if (x.getStopValue() != null) {
+ return serialize(columnMapping, x, x.getStopValue());
+ } else {
+ return HConstants.EMPTY_START_ROW;
+ }
+ })
+ .collect(Collectors.toList());
+
} else {
- startRows = TUtil.newList(HConstants.EMPTY_START_ROW);
- stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
+ startRows = EMPTY_START_ROW_KEY;
+ stopRows = EMPTY_END_ROW_KEY;
}
hAdmin = new HBaseAdmin(hbaseConf);
@@ -544,6 +551,7 @@ public class HBaseTablespace extends Tablespace {
if (!fragments.isEmpty()) {
fragments.get(fragments.size() - 1).setLast(true);
}
+
return (ArrayList<Fragment>) (ArrayList) fragments;
} finally {
if (htable != null) {
@@ -556,7 +564,7 @@ public class HBaseTablespace extends Tablespace {
}
private byte[] serialize(ColumnMapping columnMapping,
- IndexPredication indexPredication, Datum datum) throws IOException {
+ IndexPredication indexPredication, Datum datum) {
if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) {
return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum);
} else {
@@ -690,10 +698,7 @@ public class HBaseTablespace extends Tablespace {
@Override
public String toString() {
- return "HConnectionKey{" +
- "properties=" + properties +
- ", username='" + username + '\'' +
- '}';
+ return "HConnectionKey{ properties=" + properties + ", username='" + username + '\'' + '}';
}
}
@@ -702,25 +707,22 @@ public class HBaseTablespace extends Tablespace {
@Nullable EvalNode filterCondition)
throws IOException, MissingTablePropertyException, InvalidTablePropertyException {
- List<IndexPredication> indexPredications = new ArrayList<>();
- Column[] indexableColumns = getIndexableColumns(tableDesc);
- if (indexableColumns != null && indexableColumns.length == 1) {
- // Currently supports only single index column.
- List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(filterCondition, indexableColumns);
- for (Set<EvalNode> eachEvalSet: indexablePredicateList) {
- Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet);
- if (indexPredicationValues != null) {
- IndexPredication indexPredication = new IndexPredication();
- indexPredication.setColumn(indexableColumns[0]);
- indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()));
- indexPredication.setStartValue(indexPredicationValues.getFirst());
- indexPredication.setStopValue(indexPredicationValues.getSecond());
-
- indexPredications.add(indexPredication);
- }
- }
+ final Column[] indexableColumns = getIndexableColumns(tableDesc);
+ if (indexableColumns == null || indexableColumns.length == 0) {
+ return Collections.EMPTY_LIST;
}
- return indexPredications;
+
+ // Currently supports only single index column.
+ return findIndexablePredicateSet(filterCondition, indexableColumns).stream()
+ .map(set -> getIndexablePredicateValue(columnMapping, set))
+ .filter(value -> value != null)
+ .map(value ->
+ new IndexPredication(
+ indexableColumns[0],
+ tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()),
+ value.getFirst(),
+ value.getSecond()))
+ .collect(Collectors.toList());
}
public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual,
http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
index 3a58e50..6241a94 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
@@ -22,40 +22,31 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.datum.Datum;
public class IndexPredication {
- private Column column;
- private int columnId;
- private Datum startValue;
- private Datum stopValue;
+ final private Column column;
+ final private int columnId;
+ final private Datum startValue;
+ final private Datum stopValue;
+
+ public IndexPredication(Column c, int columnId, Datum startValue, Datum stopValue) {
+ this.column = c;
+ this.columnId = columnId;
+ this.startValue = startValue;
+ this.stopValue = stopValue;
+ }
public Column getColumn() {
return column;
}
- public void setColumn(Column column) {
- this.column = column;
- }
-
public int getColumnId() {
return columnId;
}
- public void setColumnId(int columnId) {
- this.columnId = columnId;
- }
-
public Datum getStartValue() {
return startValue;
}
- public void setStartValue(Datum startValue) {
- this.startValue = startValue;
- }
-
public Datum getStopValue() {
return stopValue;
}
-
- public void setStopValue(Datum stopValue) {
- this.stopValue = stopValue;
- }
}