You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/05/17 13:18:52 UTC
[phoenix] branch 5.1 updated: PHOENIX-6944 Randomize mapper task ordering for Index MR tools
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new f6f0016c43 PHOENIX-6944 Randomize mapper task ordering for Index MR tools
f6f0016c43 is described below
commit f6f0016c439b4df39a01622eccf5d898c1a4ad96
Author: Istvan Toth <st...@apache.org>
AuthorDate: Fri Apr 28 08:16:35 2023 +0200
PHOENIX-6944 Randomize mapper task ordering for Index MR tools
---
.../phoenix/mapreduce/PhoenixInputFormat.java | 129 +++++++++++++--------
.../phoenix/mapreduce/PhoenixInputSplit.java | 21 ++--
.../phoenix/mapreduce/index/IndexScrutinyTool.java | 4 +
.../apache/phoenix/mapreduce/index/IndexTool.java | 4 +
.../mapreduce/util/PhoenixConfigurationUtil.java | 13 +++
5 files changed, 114 insertions(+), 57 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index c294fede6f..f9c93e0554 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -55,6 +55,7 @@ import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
/**
* {@link InputFormat} implementation from Phoenix.
@@ -87,74 +88,106 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
return generateSplits(queryPlan, configuration);
}
- private List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException {
+ /**
+ * Randomise the length parameter of the splits to ensure random execution order.
+ * Yarn orders splits by size before execution.
+ *
+ * @param splits
+ */
+ protected void randomizeSplitLength(List<InputSplit> splits) {
+ LOGGER.info("Randomizing split size");
+ if (splits.size() == 0) {
+ return;
+ }
+ double defaultLength = 1000000d;
+ double totalLength = splits.stream().mapToDouble(s -> {
+ try {
+ return (double) s.getLength();
+ } catch (IOException | InterruptedException e1) {
+ return defaultLength;
+ }
+ }).sum();
+ long avgLength = (long) (totalLength / splits.size());
+ splits.stream().forEach(s -> ((PhoenixInputSplit) s)
+ .setLength(avgLength + ThreadLocalRandom.current().nextInt(10000)));
+ }
+
+ protected List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config)
+ throws IOException {
// We must call this in order to initialize the scans and splits from the query plan
setupParallelScansFromQueryPlan(qplan);
final List<KeyRange> splits = qplan.getSplits();
Preconditions.checkNotNull(splits);
// Get the RegionSizeCalculator
- try(org.apache.hadoop.hbase.client.Connection connection =
- HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) {
- RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
- RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
- .getAdmin());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName
+ .valueOf(qplan.getTableRef().getTable().getPhysicalName().toString()));
+ RegionSizeCalculator sizeCalculator =
+ new RegionSizeCalculator(regionLocator, connection.getAdmin());
- final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
- for (List<Scan> scans : qplan.getScans()) {
- // Get the region location
- HRegionLocation location = regionLocator.getRegionLocation(
- scans.get(0).getStartRow(),
- false
- );
+ final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+ for (List<Scan> scans : qplan.getScans()) {
+ // Get the region location
+ HRegionLocation location =
+ regionLocator.getRegionLocation(scans.get(0).getStartRow(), false);
- String regionLocation = location.getHostname();
+ String regionLocation = location.getHostname();
- // Get the region size
- long regionSize = sizeCalculator.getRegionSize(
- location.getRegion().getRegionName()
- );
+ // Get the region size
+ long regionSize =
+ sizeCalculator.getRegionSize(location.getRegion().getRegionName());
- // Generate splits based off statistics, or just region splits?
- boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config);
+ // Generate splits based off statistics, or just region splits?
+ boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config);
- if (splitByStats) {
- for (Scan aScan : scans) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
- .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
- aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
- .getBatch() + "] and regionLocation : " + regionLocation);
- }
+ if (splitByStats) {
+ for (Scan aScan : scans) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Split for scan : " + aScan + "with scanAttribute : "
+ + aScan.getAttributesMap()
+ + " [scanCache, cacheBlock, scanBatch] : [" + aScan.getCaching()
+ + ", " + aScan.getCacheBlocks() + ", " + aScan.getBatch()
+ + "] and regionLocation : " + regionLocation);
+ }
- psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation));
- }
+ // The size is bogus, but it's not a problem
+ psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan),
+ regionSize, regionLocation));
+ }
} else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
- .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
- .size() - 1).getStopRow()));
- LOGGER.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
- .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
- "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
- + ", " + scans.get(0).getBatch() + "] and regionLocation : " +
- regionLocation);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Scan count[" + scans.size() + "] : "
+ + Bytes.toStringBinary(scans.get(0).getStartRow()) + " ~ "
+ + Bytes.toStringBinary(scans.get(scans.size() - 1).getStopRow()));
+ LOGGER.debug("First scan : " + scans.get(0) + "with scanAttribute : "
+ + scans.get(0).getAttributesMap()
+ + " [scanCache, cacheBlock, scanBatch] : " + "["
+ + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ + ", " + scans.get(0).getBatch() + "] and regionLocation : "
+ + regionLocation);
- for (int i = 0, limit = scans.size(); i < limit; i++) {
- LOGGER.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
- .toStringBinary(scans.get(i).getAttribute
- (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+ for (int i = 0, limit = scans.size(); i < limit; i++) {
+ LOGGER.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : "
+ + Bytes.toStringBinary(scans.get(i).getAttribute(
+ BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+ }
}
+
+ psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation));
}
+ }
- psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation));
+ if (PhoenixConfigurationUtil.isMRRandomizeMapperExecutionOrder(config)) {
+ randomizeSplitLength(psplits);
}
+
+ return psplits;
}
- return psplits;
}
- }
-
+
/**
* Returns the query plan associated with the select query.
* @param context
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index 7819c5318c..a4dc1b789e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -41,7 +41,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
private List<Scan> scans;
private KeyRange keyRange;
private String regionLocation = null;
- private long regionSize = 0;
+ private long splitSize = 0;
/**
* No Arg constructor
@@ -57,11 +57,11 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
this(scans, 0, null);
}
- public PhoenixInputSplit(final List<Scan> scans, long regionSize, String regionLocation) {
+ public PhoenixInputSplit(final List<Scan> scans, long splitSize, String regionLocation) {
Preconditions.checkNotNull(scans);
Preconditions.checkState(!scans.isEmpty());
this.scans = scans;
- this.regionSize = regionSize;
+ this.splitSize = splitSize;
this.regionLocation = regionLocation;
init();
}
@@ -81,7 +81,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public void readFields(DataInput input) throws IOException {
regionLocation = WritableUtils.readString(input);
- regionSize = WritableUtils.readVLong(input);
+ splitSize = WritableUtils.readVLong(input);
int count = WritableUtils.readVInt(input);
scans = Lists.newArrayListWithExpectedSize(count);
for (int i = 0; i < count; i++) {
@@ -97,7 +97,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public void write(DataOutput output) throws IOException {
WritableUtils.writeString(output, regionLocation);
- WritableUtils.writeVLong(output, regionSize);
+ WritableUtils.writeVLong(output, splitSize);
Preconditions.checkNotNull(scans);
WritableUtils.writeVInt(output, scans.size());
@@ -111,15 +111,14 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
@Override
public long getLength() throws IOException, InterruptedException {
- return regionSize;
+ return splitSize;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
- if(regionLocation == null) {
+ if (regionLocation == null) {
return new String[]{};
- }
- else {
+ } else {
return new String[]{regionLocation};
}
}
@@ -144,4 +143,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable {
return true;
}
+ public void setLength(long length) {
+ this.splitSize = length;
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index 1f775d6370..151fac2a54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -264,6 +264,10 @@ public class IndexScrutinyTool extends Configured implements Tool {
final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
+ // Randomize execution order, unless explicitly set
+ configuration.setBooleanIfUnset(
+ PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true);
+
// set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny
configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 9dd384a024..bc7997c5fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -723,6 +723,10 @@ public class IndexTool extends Configured implements Tool {
Long.toString(indexRebuildRpcRetriesCounter));
configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
+ // Randomize execution order, unless explicitly set
+ configuration.setBooleanIfUnset(
+ PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true);
+
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, dataTableWithSchema);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index f17510e68d..1fa590829b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -201,6 +201,13 @@ public final class PhoenixConfigurationUtil {
// by default MR snapshot restore is handled internally by phoenix
public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false;
+ // Randomize mapper execution order
+ public static final String MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER =
+ "phoenix.mapreduce.randomize.mapper.execution.order";
+
+ // non-index jobs benefit less from this
+ public static final boolean DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER = false;
+
/**
* Determines type of Phoenix Map Reduce job.
* 1. QUERY allows running arbitrary queries without aggregates
@@ -904,4 +911,10 @@ public final class PhoenixConfigurationUtil {
return isSnapshotRestoreManagedExternally;
}
+ public static boolean isMRRandomizeMapperExecutionOrder(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
+ DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
+ }
+
}