You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/05/27 20:23:17 UTC
[drill] 01/02: DRILL-6442: Adjust Hbase disk cost & row count
estimation when filter push down is applied
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 18766950c640c6963ffd1c94224c4a984bedd3c1
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed May 23 16:17:11 2018 +0000
DRILL-6442: Adjust Hbase disk cost & row count estimation when filter push down is applied
closes #1288
---
.../drill/exec/store/hbase/HBaseGroupScan.java | 64 ++++++++++-----------
.../exec/store/hbase/TableStatsCalculator.java | 67 ++++++++++++----------
2 files changed, 66 insertions(+), 65 deletions(-)
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 9eeba24..97c9a95 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -63,20 +65,13 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
@JsonTypeName("hbase-scan")
public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
- private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() {
- @Override
- public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
- return list1.size() - list2.size();
- }
- };
+ private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = (list1, list2) -> list1.size() - list2.size();
private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
@@ -182,12 +177,12 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
public List<EndpointAffinity> getOperatorAffinity() {
watch.reset();
watch.start();
- Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) {
endpointMap.put(ep.getAddress(), ep);
}
- Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
for (ServerName sn : regionsToScan.values()) {
DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
if (ep != null) {
@@ -199,14 +194,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
}
}
}
- logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
- return Lists.newArrayList(affinityMap.values());
+ logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS) / 1000);
+ return new ArrayList<>(affinityMap.values());
}
- /**
- *
- * @param incomingEndpoints
- */
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
watch.reset();
@@ -230,23 +221,23 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
/*
* another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
*/
- Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+ Map<String, Queue<Integer>> endpointHostIndexListMap = new HashMap<>();
/*
* Initialize these two maps
*/
for (int i = 0; i < numSlots; ++i) {
- endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+ endpointFragmentMapping.put(i, new ArrayList<>(maxPerEndpointSlot));
String hostname = incomingEndpoints.get(i).getAddress();
Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
if (hostIndexQueue == null) {
- hostIndexQueue = Lists.newLinkedList();
+ hostIndexQueue = new LinkedList<>();
endpointHostIndexListMap.put(hostname, hostIndexQueue);
}
hostIndexQueue.add(i);
}
- Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+ Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = new HashSet<>(regionsToScan.entrySet());
/*
* First, we assign regions which are hosted on region servers running on drillbit endpoints
@@ -256,13 +247,13 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
/*
* Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
*/
- Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
- if (endpointIndexlist != null) {
- Integer slotIndex = endpointIndexlist.poll();
+ Queue<Integer> endpointIndexList = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+ if (endpointIndexList != null) {
+ Integer slotIndex = endpointIndexList.poll();
List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
// add to the tail of the slot list, to add more later in round robin fashion
- endpointIndexlist.offer(slotIndex);
+ endpointIndexList.offer(slotIndex);
// this region has been assigned
regionsIterator.remove();
}
@@ -271,8 +262,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
/*
* Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
*/
- PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
- PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+ PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR);
+ PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV);
for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
if (listOfScan.size() < minPerEndpointSlot) {
minHeap.offer(listOfScan);
@@ -310,12 +301,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
}
/* no slot should be empty at this point */
- assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
- "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
- incomingEndpoints, endpointFragmentMapping.toString());
+ assert (minHeap.peek() == null || minHeap.peek().size() > 0) :
+ String.format("Unable to assign tasks to some endpoints.\nEndpoints: %s.\nAssignment Map: %s.", incomingEndpoints, endpointFragmentMapping.toString());
logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
- watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
+ watch.elapsed(TimeUnit.NANOSECONDS) / 1000, incomingEndpoints, endpointFragmentMapping.toString());
}
private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
@@ -347,9 +337,15 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
@Override
public ScanStats getScanStats() {
- long rowCount = (long) ((scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 0.5 : 1));
- // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier.
- float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size()/statsCalculator.getColsPerRow());
+ long rowCount = scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes();
+ // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier
+ float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size() / statsCalculator.getColsPerRow());
+ // if filter push down is used, reduce estimated row count and disk cost by half to ensure plan cost will be less then without it
+ if (hbaseScanSpec.getFilter() != null) {
+ rowCount = (long) (rowCount * 0.5);
+ // if during sampling we found out exact row count, no need to reduce number of rows
+ diskCost = statsCalculator.usedDefaultRowCount() ? diskCost * 0.5F : diskCost;
+ }
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
index 379fb7c..b435fbd 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
@@ -48,36 +48,36 @@ import org.apache.hadoop.hbase.util.Bytes;
* Computes size of each region for given table.
*/
public class TableStatsCalculator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class);
- public static final long DEFAULT_ROW_COUNT = 1024L * 1024L;
+ public static final long DEFAULT_ROW_COUNT = 1024L * 1024L; // 1 million rows
private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = "drill.exec.hbase.scan.samplerows.count";
private static final int DEFAULT_SAMPLE_SIZE = 100;
- /**
- * Maps each region to its size in bytes.
- */
+ // Maps each region to its size in bytes.
private Map<byte[], Long> sizeMap = null;
private int avgRowSizeInBytes = 1;
private int colsPerRow = 1;
+ private long estimatedRowCount = DEFAULT_ROW_COUNT;
+
/**
* Computes size of each region for table.
*
- * @param conn
- * @param hbaseScanSpec
- * @param config
- * @throws IOException
+ * @param connection connection to Hbase client
+ * @param hbaseScanSpec scan specification
+ * @param config drill configuration
+ * @param storageConfig Hbase storage configuration
*/
- public TableStatsCalculator(Connection conn, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException {
+ public TableStatsCalculator(Connection connection, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException {
TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName());
- try (Admin admin = conn.getAdmin();
- Table table = conn.getTable(tableName);
- RegionLocator locator = conn.getRegionLocator(tableName)) {
+ try (Admin admin = connection.getAdmin();
+ Table table = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName)) {
int rowsToSample = rowsToSample(config);
if (rowsToSample > 0) {
Scan scan = new Scan(hbaseScanSpec.getStartRow(), hbaseScanSpec.getStopRow());
@@ -100,22 +100,25 @@ public class TableStatsCalculator {
}
}
if (rowCount > 0) {
- avgRowSizeInBytes = (int) (rowSizeSum/rowCount);
- colsPerRow = numColumnsSum/rowCount;
+ avgRowSizeInBytes = (int) (rowSizeSum / rowCount);
+ colsPerRow = numColumnsSum / rowCount;
+ // if during sampling we receive less rows than expected, then we can use this number instead of default
+ estimatedRowCount = rowCount == rowsToSample ? estimatedRowCount : rowCount;
}
+
scanner.close();
}
if (!enabled(storageConfig)) {
- logger.info("Region size calculation disabled.");
+ logger.debug("Region size calculation is disabled.");
return;
}
- logger.info("Calculating region sizes for table '{}'.", tableName.getNameAsString());
+ logger.debug("Calculating region sizes for table '{}'.", tableName.getNameAsString());
- //get regions for table
+ // get regions for table
List<HRegionLocation> tableRegionInfos = locator.getAllRegionLocations();
- Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (HRegionLocation regionInfo : tableRegionInfos) {
tableRegions.add(regionInfo.getRegionInfo().getRegionName());
}
@@ -124,17 +127,17 @@ public class TableStatsCalculator {
try {
clusterStatus = admin.getClusterStatus();
} catch (Exception e) {
- logger.debug(e.getMessage());
+ logger.debug(e.getMessage(), e);
} finally {
if (clusterStatus == null) {
return;
}
}
- sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ sizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Collection<ServerName> servers = clusterStatus.getServers();
- //iterate all cluster regions, filter regions from our table and compute their size
+ // iterate all cluster regions, filter regions from our table and compute their size
for (ServerName serverName : servers) {
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
@@ -143,14 +146,12 @@ public class TableStatsCalculator {
if (tableRegions.contains(regionId)) {
long regionSizeMB = regionLoad.getMemStoreSizeMB() + regionLoad.getStorefileSizeMB();
- sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * (1024*1024));
- if (logger.isDebugEnabled()) {
- logger.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeMB + "MB");
- }
+ sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * estimatedRowCount);
+ logger.debug("Region {} has size {} MB.", regionLoad.getNameAsString(), regionSizeMB);
}
}
}
- logger.debug("Region sizes calculated");
+ logger.debug("Region sizes calculated.");
}
}
@@ -160,8 +161,8 @@ public class TableStatsCalculator {
}
private int rowsToSample(DrillConfig config) {
- return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT)
- ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE;
+ return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) ?
+ config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE;
}
/**
@@ -169,11 +170,11 @@ public class TableStatsCalculator {
*/
public long getRegionSizeInBytes(byte[] regionId) {
if (sizeMap == null) {
- return (long) avgRowSizeInBytes * DEFAULT_ROW_COUNT; // 1 million rows
+ return (long) avgRowSizeInBytes * estimatedRowCount;
} else {
Long size = sizeMap.get(regionId);
if (size == null) {
- logger.debug("Unknown region:" + Arrays.toString(regionId));
+ logger.debug("Unknown region: {}.", Arrays.toString(regionId));
return 0;
} else {
return size;
@@ -189,4 +190,8 @@ public class TableStatsCalculator {
return colsPerRow;
}
+ public boolean usedDefaultRowCount() {
+ return estimatedRowCount == DEFAULT_ROW_COUNT;
+ }
+
}
--
To stop receiving notification emails like this one, please contact
boaz@apache.org.