You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/05/17 13:18:52 UTC

[phoenix] branch 5.1 updated: PHOENIX-6944 Randomize mapper task ordering for Index MR tools

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new f6f0016c43 PHOENIX-6944 Randomize mapper task ordering for Index MR tools
f6f0016c43 is described below

commit f6f0016c439b4df39a01622eccf5d898c1a4ad96
Author: Istvan Toth <st...@apache.org>
AuthorDate: Fri Apr 28 08:16:35 2023 +0200

    PHOENIX-6944 Randomize mapper task ordering for Index MR tools
---
 .../phoenix/mapreduce/PhoenixInputFormat.java      | 129 +++++++++++++--------
 .../phoenix/mapreduce/PhoenixInputSplit.java       |  21 ++--
 .../phoenix/mapreduce/index/IndexScrutinyTool.java |   4 +
 .../apache/phoenix/mapreduce/index/IndexTool.java  |   4 +
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  13 +++
 5 files changed, 114 insertions(+), 57 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index c294fede6f..f9c93e0554 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -55,6 +55,7 @@ import java.sql.Statement;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * {@link InputFormat} implementation from Phoenix.
@@ -87,74 +88,106 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
         return generateSplits(queryPlan, configuration);
     }
 
-    private List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException {
+    /**
+     * Randomise the length parameter of the splits to ensure random execution order.
+     * Yarn orders splits by size before execution.
+     *
+     * @param splits
+     */
+    protected void randomizeSplitLength(List<InputSplit> splits) {
+        LOGGER.info("Randomizing split size");
+        if (splits.size() == 0) {
+            return;
+        }
+        double defaultLength = 1000000d;
+        double totalLength = splits.stream().mapToDouble(s -> {
+            try {
+                return (double) s.getLength();
+            } catch (IOException | InterruptedException e1) {
+                return defaultLength;
+            }
+        }).sum();
+        long avgLength = (long) (totalLength / splits.size());
+        splits.stream().forEach(s -> ((PhoenixInputSplit) s)
+                .setLength(avgLength + ThreadLocalRandom.current().nextInt(10000)));
+    }
+
+    protected List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config)
+            throws IOException {
         // We must call this in order to initialize the scans and splits from the query plan
         setupParallelScansFromQueryPlan(qplan);
         final List<KeyRange> splits = qplan.getSplits();
         Preconditions.checkNotNull(splits);
 
         // Get the RegionSizeCalculator
-        try(org.apache.hadoop.hbase.client.Connection connection =
-                    HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) {
-        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
-        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
-                .getAdmin());
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) {
+            RegionLocator regionLocator =
+                    connection.getRegionLocator(TableName
+                            .valueOf(qplan.getTableRef().getTable().getPhysicalName().toString()));
+            RegionSizeCalculator sizeCalculator =
+                    new RegionSizeCalculator(regionLocator, connection.getAdmin());
 
-        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
-        for (List<Scan> scans : qplan.getScans()) {
-            // Get the region location
-            HRegionLocation location = regionLocator.getRegionLocation(
-                    scans.get(0).getStartRow(),
-                    false
-            );
+            final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+            for (List<Scan> scans : qplan.getScans()) {
+                // Get the region location
+                HRegionLocation location =
+                        regionLocator.getRegionLocation(scans.get(0).getStartRow(), false);
 
-            String regionLocation = location.getHostname();
+                String regionLocation = location.getHostname();
 
-            // Get the region size
-            long regionSize = sizeCalculator.getRegionSize(
-                    location.getRegion().getRegionName()
-            );
+                // Get the region size
+                long regionSize =
+                        sizeCalculator.getRegionSize(location.getRegion().getRegionName());
 
-            // Generate splits based off statistics, or just region splits?
-            boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config);
+                // Generate splits based off statistics, or just region splits?
+                boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config);
 
-            if (splitByStats) {
-                for (Scan aScan : scans) {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("Split for  scan : " + aScan + "with scanAttribute : " + aScan
-                                .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
-                                aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
-                                .getBatch() + "] and  regionLocation : " + regionLocation);
-                    }
+                if (splitByStats) {
+                    for (Scan aScan : scans) {
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Split for  scan : " + aScan + "with scanAttribute : "
+                                    + aScan.getAttributesMap()
+                                    + " [scanCache, cacheBlock, scanBatch] : [" + aScan.getCaching()
+                                    + ", " + aScan.getCacheBlocks() + ", " + aScan.getBatch()
+                                    + "] and  regionLocation : " + regionLocation);
+                        }
 
-                    psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation));
-                }
+                        // The size is bogus, but it's not a problem
+                        psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan),
+                                regionSize, regionLocation));
+                    }
                 } else {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
-                            .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
-                            .size() - 1).getStopRow()));
-                    LOGGER.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
-                            .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
-                            "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
-                            + ", " + scans.get(0).getBatch() + "] and  regionLocation : " +
-                            regionLocation);
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("Scan count[" + scans.size() + "] : "
+                                + Bytes.toStringBinary(scans.get(0).getStartRow()) + " ~ "
+                                + Bytes.toStringBinary(scans.get(scans.size() - 1).getStopRow()));
+                        LOGGER.debug("First scan : " + scans.get(0) + "with scanAttribute : "
+                                + scans.get(0).getAttributesMap()
+                                + " [scanCache, cacheBlock, scanBatch] : " + "["
+                                + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+                                + ", " + scans.get(0).getBatch() + "] and  regionLocation : "
+                                + regionLocation);
 
-                    for (int i = 0, limit = scans.size(); i < limit; i++) {
-                        LOGGER.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
-                                .toStringBinary(scans.get(i).getAttribute
-                                        (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+                        for (int i = 0, limit = scans.size(); i < limit; i++) {
+                            LOGGER.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : "
+                                    + Bytes.toStringBinary(scans.get(i).getAttribute(
+                                        BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+                        }
                     }
+
+                    psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation));
                 }
+            }
 
-                psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation));
+            if (PhoenixConfigurationUtil.isMRRandomizeMapperExecutionOrder(config)) {
+                randomizeSplitLength(psplits);
             }
+
+            return psplits;
         }
-        return psplits;
     }
-    }
-    
+
     /**
      * Returns the query plan associated with the select query.
      * @param context
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index 7819c5318c..a4dc1b789e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -41,7 +41,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
     private List<Scan> scans;
     private KeyRange keyRange;
     private String regionLocation = null;
-    private long regionSize = 0;
+    private long splitSize = 0;
    
     /**
      * No Arg constructor
@@ -57,11 +57,11 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
         this(scans, 0, null);
     }
 
-    public PhoenixInputSplit(final List<Scan> scans, long regionSize, String regionLocation) {
+    public PhoenixInputSplit(final List<Scan> scans, long splitSize, String regionLocation) {
         Preconditions.checkNotNull(scans);
         Preconditions.checkState(!scans.isEmpty());
         this.scans = scans;
-        this.regionSize = regionSize;
+        this.splitSize = splitSize;
         this.regionLocation = regionLocation;
         init();
     }
@@ -81,7 +81,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
     @Override
     public void readFields(DataInput input) throws IOException {
         regionLocation = WritableUtils.readString(input);
-        regionSize = WritableUtils.readVLong(input);
+        splitSize = WritableUtils.readVLong(input);
         int count = WritableUtils.readVInt(input);
         scans = Lists.newArrayListWithExpectedSize(count);
         for (int i = 0; i < count; i++) {
@@ -97,7 +97,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
     @Override
     public void write(DataOutput output) throws IOException {
         WritableUtils.writeString(output, regionLocation);
-        WritableUtils.writeVLong(output, regionSize);
+        WritableUtils.writeVLong(output, splitSize);
 
         Preconditions.checkNotNull(scans);
         WritableUtils.writeVInt(output, scans.size());
@@ -111,15 +111,14 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
 
     @Override
     public long getLength() throws IOException, InterruptedException {
-         return regionSize;
+        return splitSize;
     }
 
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
-        if(regionLocation == null) {
+        if (regionLocation == null) {
             return new String[]{};
-        }
-        else {
+        } else {
             return new String[]{regionLocation};
         }
     }
@@ -144,4 +143,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
         return true;
     }
 
+    public void setLength(long length) {
+        this.splitSize = length;
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index 1f775d6370..151fac2a54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -264,6 +264,10 @@ public class IndexScrutinyTool extends Configured implements Tool {
             final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
             final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
 
+            // Randomize execution order, unless explicitly set
+            configuration.setBooleanIfUnset(
+                PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true);
+
             // set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny
             configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts));
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 9dd384a024..bc7997c5fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -723,6 +723,10 @@ public class IndexTool extends Configured implements Tool {
                     Long.toString(indexRebuildRpcRetriesCounter));
             configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
 
+            // Randomize execution order, unless explicitly set
+            configuration.setBooleanIfUnset(
+                PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true);
+
             PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, dataTableWithSchema);
             PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
             PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index f17510e68d..1fa590829b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -201,6 +201,13 @@ public final class PhoenixConfigurationUtil {
     // by default MR snapshot restore is handled internally by phoenix
     public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false;
 
+    // Randomize mapper execution order
+    public static final String MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER =
+            "phoenix.mapreduce.randomize.mapper.execution.order";
+
+    // non-index jobs benefit less from this
+    public static final boolean DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER = false;
+
     /**
      * Determines type of Phoenix Map Reduce job.
      * 1. QUERY allows running arbitrary queries without aggregates
@@ -904,4 +911,10 @@ public final class PhoenixConfigurationUtil {
         return isSnapshotRestoreManagedExternally;
     }
 
+    public static boolean isMRRandomizeMapperExecutionOrder(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
+            DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
+    }
+
 }