You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/05/27 20:23:17 UTC

[drill] 01/02: DRILL-6442: Adjust Hbase disk cost & row count estimation when filter push down is applied

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 18766950c640c6963ffd1c94224c4a984bedd3c1
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed May 23 16:17:11 2018 +0000

    DRILL-6442: Adjust Hbase disk cost & row count estimation when filter push down is applied
    
    closes #1288
---
 .../drill/exec/store/hbase/HBaseGroupScan.java     | 64 ++++++++++-----------
 .../exec/store/hbase/TableStatsCalculator.java     | 67 ++++++++++++----------
 2 files changed, 66 insertions(+), 65 deletions(-)

diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 9eeba24..97c9a95 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -63,20 +65,13 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
 
-  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() {
-    @Override
-    public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
-      return list1.size() - list2.size();
-    }
-  };
+  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = (list1, list2) -> list1.size() - list2.size();
 
   private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
 
@@ -182,12 +177,12 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   public List<EndpointAffinity> getOperatorAffinity() {
     watch.reset();
     watch.start();
-    Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
+    Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
     for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) {
       endpointMap.put(ep.getAddress(), ep);
     }
 
-    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     for (ServerName sn : regionsToScan.values()) {
       DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
       if (ep != null) {
@@ -199,14 +194,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
         }
       }
     }
-    logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
-    return Lists.newArrayList(affinityMap.values());
+    logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS) / 1000);
+    return new ArrayList<>(affinityMap.values());
   }
 
-  /**
-   *
-   * @param incomingEndpoints
-   */
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
     watch.reset();
@@ -230,23 +221,23 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     /*
      * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
      */
-    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+    Map<String, Queue<Integer>> endpointHostIndexListMap = new HashMap<>();
 
     /*
      * Initialize these two maps
      */
     for (int i = 0; i < numSlots; ++i) {
-      endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+      endpointFragmentMapping.put(i, new ArrayList<>(maxPerEndpointSlot));
       String hostname = incomingEndpoints.get(i).getAddress();
       Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
       if (hostIndexQueue == null) {
-        hostIndexQueue = Lists.newLinkedList();
+        hostIndexQueue = new LinkedList<>();
         endpointHostIndexListMap.put(hostname, hostIndexQueue);
       }
       hostIndexQueue.add(i);
     }
 
-    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = new HashSet<>(regionsToScan.entrySet());
 
     /*
      * First, we assign regions which are hosted on region servers running on drillbit endpoints
@@ -256,13 +247,13 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
       /*
        * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
        */
-      Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
-      if (endpointIndexlist != null) {
-        Integer slotIndex = endpointIndexlist.poll();
+      Queue<Integer> endpointIndexList = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+      if (endpointIndexList != null) {
+        Integer slotIndex = endpointIndexList.poll();
         List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
         endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
         // add to the tail of the slot list, to add more later in round robin fashion
-        endpointIndexlist.offer(slotIndex);
+        endpointIndexList.offer(slotIndex);
         // this region has been assigned
         regionsIterator.remove();
       }
@@ -271,8 +262,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     /*
      * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
      */
-    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR);
+    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV);
     for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
       if (listOfScan.size() < minPerEndpointSlot) {
         minHeap.offer(listOfScan);
@@ -310,12 +301,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     }
 
     /* no slot should be empty at this point */
-    assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
-        "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
-        incomingEndpoints, endpointFragmentMapping.toString());
+    assert (minHeap.peek() == null || minHeap.peek().size() > 0) :
+      String.format("Unable to assign tasks to some endpoints.\nEndpoints: %s.\nAssignment Map: %s.", incomingEndpoints, endpointFragmentMapping.toString());
 
     logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
-        watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
+        watch.elapsed(TimeUnit.NANOSECONDS) / 1000, incomingEndpoints, endpointFragmentMapping.toString());
   }
 
   private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
@@ -347,9 +337,15 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
 
   @Override
   public ScanStats getScanStats() {
-    long rowCount = (long) ((scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 0.5 : 1));
-    // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier.
-    float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size()/statsCalculator.getColsPerRow());
+    long rowCount = scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes();
+    // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier
+    float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size() / statsCalculator.getColsPerRow());
+    // if filter push down is used, reduce estimated row count and disk cost by half to ensure plan cost will be less then without it
+    if (hbaseScanSpec.getFilter() != null) {
+      rowCount = (long) (rowCount * 0.5);
+      // if during sampling we found out exact row count, no need to reduce number of rows
+      diskCost = statsCalculator.usedDefaultRowCount() ? diskCost * 0.5F : diskCost;
+    }
     return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
   }
 
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
index 379fb7c..b435fbd 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
@@ -48,36 +48,36 @@ import org.apache.hadoop.hbase.util.Bytes;
  * Computes size of each region for given table.
  */
 public class TableStatsCalculator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class);
 
-  public static final long DEFAULT_ROW_COUNT = 1024L * 1024L;
+  public static final long DEFAULT_ROW_COUNT = 1024L * 1024L; // 1 million rows
 
   private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = "drill.exec.hbase.scan.samplerows.count";
 
   private static final int DEFAULT_SAMPLE_SIZE = 100;
 
-  /**
-   * Maps each region to its size in bytes.
-   */
+  // Maps each region to its size in bytes.
   private Map<byte[], Long> sizeMap = null;
 
   private int avgRowSizeInBytes = 1;
 
   private int colsPerRow = 1;
 
+  private long estimatedRowCount = DEFAULT_ROW_COUNT;
+
   /**
    * Computes size of each region for table.
    *
-   * @param conn
-   * @param hbaseScanSpec
-   * @param config
-   * @throws IOException
+   * @param connection connection to Hbase client
+   * @param hbaseScanSpec scan specification
+   * @param config drill configuration
+   * @param storageConfig Hbase storage configuration
    */
-  public TableStatsCalculator(Connection conn, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException {
+  public TableStatsCalculator(Connection connection, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException {
     TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName());
-    try (Admin admin = conn.getAdmin();
-         Table table = conn.getTable(tableName);
-         RegionLocator locator = conn.getRegionLocator(tableName)) {
+    try (Admin admin = connection.getAdmin();
+         Table table = connection.getTable(tableName);
+         RegionLocator locator = connection.getRegionLocator(tableName)) {
       int rowsToSample = rowsToSample(config);
       if (rowsToSample > 0) {
         Scan scan = new Scan(hbaseScanSpec.getStartRow(), hbaseScanSpec.getStopRow());
@@ -100,22 +100,25 @@ public class TableStatsCalculator {
           }
         }
         if (rowCount > 0) {
-          avgRowSizeInBytes = (int) (rowSizeSum/rowCount);
-          colsPerRow = numColumnsSum/rowCount;
+          avgRowSizeInBytes = (int) (rowSizeSum / rowCount);
+          colsPerRow = numColumnsSum / rowCount;
+          // if during sampling we receive less rows than expected, then we can use this number instead of default
+          estimatedRowCount = rowCount == rowsToSample ? estimatedRowCount : rowCount;
         }
+
         scanner.close();
       }
 
       if (!enabled(storageConfig)) {
-        logger.info("Region size calculation disabled.");
+        logger.debug("Region size calculation is disabled.");
         return;
       }
 
-      logger.info("Calculating region sizes for table '{}'.", tableName.getNameAsString());
+      logger.debug("Calculating region sizes for table '{}'.", tableName.getNameAsString());
 
-      //get regions for table
+      // get regions for table
       List<HRegionLocation> tableRegionInfos = locator.getAllRegionLocations();
-      Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+      Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
       for (HRegionLocation regionInfo : tableRegionInfos) {
         tableRegions.add(regionInfo.getRegionInfo().getRegionName());
       }
@@ -124,17 +127,17 @@ public class TableStatsCalculator {
       try {
         clusterStatus = admin.getClusterStatus();
       } catch (Exception e) {
-        logger.debug(e.getMessage());
+        logger.debug(e.getMessage(), e);
       } finally {
         if (clusterStatus == null) {
           return;
         }
       }
 
-      sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      sizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
       Collection<ServerName> servers = clusterStatus.getServers();
-      //iterate all cluster regions, filter regions from our table and compute their size
+      // iterate all cluster regions, filter regions from our table and compute their size
       for (ServerName serverName : servers) {
         ServerLoad serverLoad = clusterStatus.getLoad(serverName);
 
@@ -143,14 +146,12 @@ public class TableStatsCalculator {
 
           if (tableRegions.contains(regionId)) {
             long regionSizeMB = regionLoad.getMemStoreSizeMB() + regionLoad.getStorefileSizeMB();
-            sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * (1024*1024));
-            if (logger.isDebugEnabled()) {
-              logger.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeMB + "MB");
-            }
+            sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * estimatedRowCount);
+            logger.debug("Region {} has size {} MB.", regionLoad.getNameAsString(), regionSizeMB);
           }
         }
       }
-      logger.debug("Region sizes calculated");
+      logger.debug("Region sizes calculated.");
     }
 
   }
@@ -160,8 +161,8 @@ public class TableStatsCalculator {
   }
 
   private int rowsToSample(DrillConfig config) {
-    return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT)
-        ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE;
+    return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) ?
+      config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE;
   }
 
   /**
@@ -169,11 +170,11 @@ public class TableStatsCalculator {
    */
   public long getRegionSizeInBytes(byte[] regionId) {
     if (sizeMap == null) {
-      return (long) avgRowSizeInBytes * DEFAULT_ROW_COUNT; // 1 million rows
+      return (long) avgRowSizeInBytes * estimatedRowCount;
     } else {
       Long size = sizeMap.get(regionId);
       if (size == null) {
-        logger.debug("Unknown region:" + Arrays.toString(regionId));
+        logger.debug("Unknown region: {}.", Arrays.toString(regionId));
         return 0;
       } else {
         return size;
@@ -189,4 +190,8 @@ public class TableStatsCalculator {
     return colsPerRow;
   }
 
+  public boolean usedDefaultRowCount() {
+    return estimatedRowCount == DEFAULT_ROW_COUNT;
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
boaz@apache.org.