You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/12/14 07:21:48 UTC

tajo git commit: TAJO-1990: Refine some parts in HBaseTablespace.

Repository: tajo
Updated Branches:
  refs/heads/master 252c311ea -> 2aefa0dad


TAJO-1990: Refine some parts in HBaseTablespace.

Closes #880


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2aefa0da
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2aefa0da
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2aefa0da

Branch: refs/heads/master
Commit: 2aefa0dada15fbfe1f2c45534a160521f00c0758
Parents: 252c311
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sun Dec 13 22:18:57 2015 -0800
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sun Dec 13 22:18:57 2015 -0800

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/storage/hbase/HBaseTablespace.java     | 92 ++++++++++----------
 .../tajo/storage/hbase/IndexPredication.java    | 31 +++----
 3 files changed, 60 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9b7842f..c297940 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1990: Refine some parts in HBaseTablespace. (hyunsik)
+
     TAJO-2005: Add TableStatUpdateRewriter. (hyunsik)
 
     TAJO-1948: Change GroupbyNode::setAggFunctions and getAggFunctions to set 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index f06cc67..c1e8a2d 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -61,6 +61,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URI;
 import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * Tablespace for HBase table.
@@ -72,6 +73,8 @@ public class HBaseTablespace extends Tablespace {
       new StorageProperty("hbase", false, true, false, false);
   public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
   public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
+  public static final List<byte []> EMPTY_START_ROW_KEY = TUtil.newList(HConstants.EMPTY_START_ROW);
+  public static final List<byte []> EMPTY_END_ROW_KEY   = TUtil.newList(HConstants.EMPTY_END_ROW);
 
   private Configuration hbaseConf;
 
@@ -266,6 +269,7 @@ public class HBaseTablespace extends Tablespace {
       // If there is many split keys, Tajo allows to define in the file.
       Path path = new Path(splitRowKeysFile);
       FileSystem fs = path.getFileSystem(conf);
+
       if (!fs.exists(path)) {
         throw new MissingTablePropertyException("hbase.split.rowkeys.file=" + path.toString() + " not exists.",
             hbaseTableName);
@@ -273,6 +277,7 @@ public class HBaseTablespace extends Tablespace {
 
       SortedSet<String> splitKeySet = new TreeSet<>();
       BufferedReader reader = null;
+
       try {
         reader = new BufferedReader(new InputStreamReader(fs.open(path)));
         String line = null;
@@ -438,32 +443,34 @@ public class HBaseTablespace extends Tablespace {
         return fragments;
       }
 
-      List<byte[]> startRows;
-      List<byte[]> stopRows;
+      final List<byte[]> startRows;
+      final List<byte[]> stopRows;
 
       if (indexPredications != null && !indexPredications.isEmpty()) {
         // indexPredications is Disjunctive set
-        startRows = new ArrayList<>();
-        stopRows = new ArrayList<>();
-        for (IndexPredication indexPredication: indexPredications) {
-          byte[] startRow;
-          byte[] stopRow;
-          if (indexPredication.getStartValue() != null) {
-            startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue());
-          } else {
-            startRow = HConstants.EMPTY_START_ROW;
-          }
-          if (indexPredication.getStopValue() != null) {
-            stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue());
-          } else {
-            stopRow = HConstants.EMPTY_END_ROW;
-          }
-          startRows.add(startRow);
-          stopRows.add(stopRow);
-        }
+        startRows = indexPredications.stream()
+            .map(x -> {
+                if (x.getStartValue() != null) {
+                  return serialize(columnMapping, x, x.getStartValue());
+                } else {
+                  return HConstants.EMPTY_START_ROW;
+                }
+              })
+            .collect(Collectors.toList());
+
+        stopRows = indexPredications.stream()
+            .map(x -> {
+              if (x.getStopValue() != null) {
+                return serialize(columnMapping, x, x.getStopValue());
+              } else {
+                return HConstants.EMPTY_START_ROW;
+              }
+            })
+            .collect(Collectors.toList());
+
       } else {
-        startRows = TUtil.newList(HConstants.EMPTY_START_ROW);
-        stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
+        startRows = EMPTY_START_ROW_KEY;
+        stopRows  = EMPTY_END_ROW_KEY;
       }
 
       hAdmin =  new HBaseAdmin(hbaseConf);
@@ -544,6 +551,7 @@ public class HBaseTablespace extends Tablespace {
       if (!fragments.isEmpty()) {
         fragments.get(fragments.size() - 1).setLast(true);
       }
+
       return (ArrayList<Fragment>) (ArrayList) fragments;
     } finally {
       if (htable != null) {
@@ -556,7 +564,7 @@ public class HBaseTablespace extends Tablespace {
   }
 
   private byte[] serialize(ColumnMapping columnMapping,
-                           IndexPredication indexPredication, Datum datum) throws IOException {
+                           IndexPredication indexPredication, Datum datum) {
     if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) {
       return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum);
     } else {
@@ -690,10 +698,7 @@ public class HBaseTablespace extends Tablespace {
 
     @Override
     public String toString() {
-      return "HConnectionKey{" +
-          "properties=" + properties +
-          ", username='" + username + '\'' +
-          '}';
+      return "HConnectionKey{ properties=" + properties + ", username='" + username + '\'' + '}';
     }
   }
 
@@ -702,25 +707,22 @@ public class HBaseTablespace extends Tablespace {
                                                      @Nullable EvalNode filterCondition)
       throws IOException, MissingTablePropertyException, InvalidTablePropertyException {
 
-    List<IndexPredication> indexPredications = new ArrayList<>();
-    Column[] indexableColumns = getIndexableColumns(tableDesc);
-    if (indexableColumns != null && indexableColumns.length == 1) {
-      // Currently supports only single index column.
-      List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(filterCondition, indexableColumns);
-      for (Set<EvalNode> eachEvalSet: indexablePredicateList) {
-        Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet);
-        if (indexPredicationValues != null) {
-          IndexPredication indexPredication = new IndexPredication();
-          indexPredication.setColumn(indexableColumns[0]);
-          indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()));
-          indexPredication.setStartValue(indexPredicationValues.getFirst());
-          indexPredication.setStopValue(indexPredicationValues.getSecond());
-
-          indexPredications.add(indexPredication);
-        }
-      }
+    final Column[] indexableColumns = getIndexableColumns(tableDesc);
+    if (indexableColumns == null || indexableColumns.length == 0) {
+      return Collections.EMPTY_LIST;
     }
-    return indexPredications;
+
+    // Currently supports only single index column.
+    return findIndexablePredicateSet(filterCondition, indexableColumns).stream()
+        .map(set -> getIndexablePredicateValue(columnMapping, set))
+        .filter(value -> value != null)
+        .map(value ->
+            new IndexPredication(
+                indexableColumns[0],
+                tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()),
+                value.getFirst(),
+                value.getSecond()))
+        .collect(Collectors.toList());
   }
 
   public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual,

http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
index 3a58e50..6241a94 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
@@ -22,40 +22,31 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.Datum;
 
 public class IndexPredication {
-  private Column column;
-  private int columnId;
-  private Datum startValue;
-  private Datum stopValue;
+  final private Column column;
+  final private int columnId;
+  final private Datum startValue;
+  final private Datum stopValue;
+
+  public IndexPredication(Column c, int columnId, Datum startValue, Datum stopValue) {
+    this.column     = c;
+    this.columnId   = columnId;
+    this.startValue = startValue;
+    this.stopValue  = stopValue;
+  }
 
   public Column getColumn() {
     return column;
   }
 
-  public void setColumn(Column column) {
-    this.column = column;
-  }
-
   public int getColumnId() {
     return columnId;
   }
 
-  public void setColumnId(int columnId) {
-    this.columnId = columnId;
-  }
-
   public Datum getStartValue() {
     return startValue;
   }
 
-  public void setStartValue(Datum startValue) {
-    this.startValue = startValue;
-  }
-
   public Datum getStopValue() {
     return stopValue;
   }
-
-  public void setStopValue(Datum stopValue) {
-    this.stopValue = stopValue;
-  }
 }