You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2023/08/01 04:24:50 UTC
[phoenix] branch master updated: PHOENIX-6907 (ADDENDUM) Explain Plan should output region locations with servers (#1650)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new ed66920434 PHOENIX-6907 (ADDENDUM) Explain Plan should output region locations with servers (#1650)
ed66920434 is described below
commit ed669204346343b00786b3df4b503b3e5edaf05d
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Mon Jul 31 21:24:45 2023 -0700
PHOENIX-6907 (ADDENDUM) Explain Plan should output region locations with servers (#1650)
---
.../phoenix/iterate/BaseResultIterators.java | 31 ++++---
.../org/apache/phoenix/iterate/ExplainTable.java | 94 +++++-----------------
.../phoenix/iterate/ParallelScansCollector.java | 12 ++-
.../phoenix/iterate/ScansWithRegionLocations.java | 47 +++++++++++
4 files changed, 94 insertions(+), 90 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index cecab8626f..097ee74a92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -146,6 +146,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "98", "12");
private final List<List<Scan>> scans;
+ private final List<HRegionLocation> regionLocations;
private final List<KeyRange> splits;
private final byte[] physicalTableName;
protected final QueryPlan plan;
@@ -567,7 +568,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
initializeScan(plan, perScanLimit, offset, scan);
this.useStatsForParallelization = PhoenixConfigurationUtil.getStatsForParallelizationProp(context.getConnection(), table);
- this.scans = getParallelScans();
+ ScansWithRegionLocations scansWithRegionLocations = getParallelScans();
+ this.scans = scansWithRegionLocations.getScans();
+ this.regionLocations = scansWithRegionLocations.getRegionLocations();
List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
for (List<Scan> scanList : scans) {
for (Scan aScan : scanList) {
@@ -673,7 +676,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
gps.getGuidePostTimestamps()[guideIndex]);
}
- private List<List<Scan>> getParallelScans() throws SQLException {
+ private ScansWithRegionLocations getParallelScans() throws SQLException {
// If the scan boundaries are not matching with scan in context that means we need to get
// parallel scans for the chunk after split/merge.
if (!ScanUtil.isContextScan(scan, context)) {
@@ -690,7 +693,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
* @return
* @throws SQLException
*/
- private List<List<Scan>> getParallelScans(Scan scan) throws SQLException {
+ private ScansWithRegionLocations getParallelScans(Scan scan) throws SQLException {
List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper);
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
int regionIndex = 0;
@@ -724,10 +727,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
regionLocation.getServerName().getVersionedBytes());
}
- parallelScans.addNewScan(plan, newScan, true);
+ parallelScans.addNewScan(plan, newScan, true, regionLocation);
regionIndex++;
}
- return parallelScans.getParallelScans();
+ return new ScansWithRegionLocations(parallelScans.getParallelScans(),
+ parallelScans.getRegionLocations());
}
private static class GuidePostEstimate {
@@ -924,7 +928,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
* @return list of parallel scans to run for a given query.
* @throws SQLException
*/
- private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
+ private ScansWithRegionLocations getParallelScans(byte[] startKey, byte[] stopKey)
+ throws SQLException {
ScanRanges scanRanges = context.getScanRanges();
PTable table = getTable();
boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
@@ -937,7 +942,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), parallelScans, estimates,
Long.MAX_VALUE, false);
- return parallelScans;
+ // we don't retrieve region location for the given scan range
+ return new ScansWithRegionLocations(parallelScans, null);
}
byte[] sampleProcessedSaltByte =
SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns());
@@ -1115,7 +1121,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
boolean lastOfNew = newScanIdx == newScans.size() - 1;
parallelScanCollector.addNewScan(plan, newScan,
- gpsComparedToEndKey == 0 && lastOfNew);
+ gpsComparedToEndKey == 0 && lastOfNew, regionLocation);
}
}
if (newScans.size() > 0) {
@@ -1163,7 +1169,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
regionLocation.getServerName().getVersionedBytes());
}
boolean lastOfNew = newScanIdx == newScans.size() - 1;
- parallelScanCollector.addNewScan(plan, newScan, lastOfNew);
+ parallelScanCollector.addNewScan(plan, newScan, lastOfNew, regionLocation);
}
if (newScans.size() > 0) {
// Boundary case of no GP in region after delaying adding of estimates
@@ -1201,7 +1207,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if (stream != null) Closeables.closeQuietly(stream);
}
sampleScans(parallelScanCollector.getParallelScans(),this.plan.getStatement().getTableSamplingRate());
- return parallelScanCollector.getParallelScans();
+ return new ScansWithRegionLocations(parallelScanCollector.getParallelScans(),
+ parallelScanCollector.getRegionLocations());
}
private void generateEstimates(ScanRanges scanRanges, PTable table, GuidePostsInfo gps,
@@ -1524,7 +1531,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
byte[] endKey = oldScan.getStopRow();
- List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
+ List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey).getScans();
// Add any concatIterators that were successful so far
// as we need these to be in order
addIterator(iterators, concatIterators);
@@ -1723,7 +1730,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
}
- explain(buf.toString(), planSteps, explainPlanAttributesBuilder, scans);
+ explain(buf.toString(), planSteps, explainPlanAttributesBuilder, regionLocations);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 23b7632cb5..494226327b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.iterate;
-import java.io.IOException;
-import java.sql.SQLException;
import java.text.Format;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,11 +27,9 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
@@ -127,65 +123,10 @@ public abstract class ExplainTable {
return buf.toString();
}
- /**
- * Get regions that represent the given range of start and end key for the given table, and
- * all the regions to the regionLocations list.
- *
- * @param tableName the table name.
- * @param startKey the start rowkey.
- * @param endKey the end rowkey.
- * @param includeEndKey true if end key needs to be included.
- * @param reload true if reload from meta is necessary.
- * @param regionBoundaries set of region boundaries to get the unique list of region locations.
- * @param regionLocations the list of region locations as output.
- * @throws IOException if something goes wrong while creating connection or querying region
- * locations.
- */
- private void getRegionsInRange(final byte[] tableName,
- final byte[] startKey,
- final byte[] endKey,
- final boolean includeEndKey,
- final boolean reload,
- Set<RegionBoundary> regionBoundaries,
- List<HRegionLocation> regionLocations)
- throws IOException, SQLException {
- final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
- if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
- throw new IllegalArgumentException(
- "Invalid range: " + Bytes.toStringBinary(startKey) + " > " +
- Bytes.toStringBinary(endKey));
- }
- byte[] currentKey = startKey;
- try (Table table = context.getConnection().getQueryServices().getTable(tableName)) {
- // include all regions that include key range from the given start key
- // and end key
- do {
- HRegionLocation regionLocation =
- table.getRegionLocator().getRegionLocation(currentKey, reload);
- RegionBoundary regionBoundary =
- new RegionBoundary(regionLocation.getRegion().getStartKey(),
- regionLocation.getRegion().getEndKey());
- if (!regionBoundaries.contains(regionBoundary)) {
- regionLocations.add(regionLocation);
- regionBoundaries.add(regionBoundary);
- }
- currentKey = regionLocation.getRegion().getEndKey();
- // condition1 = currentKey != END_ROW_KEY
- // condition2 = endKeyIsEndOfTable == true
- // condition3 = currentKey < endKey
- // condition4 = includeEndKey == true
- // condition5 = currentKey == endKey
- // while (condition1 && (condition2 || condition3 || (condition4 && condition5)))
- } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
- && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
- || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
- }
- }
-
protected void explain(String prefix,
List<String> planSteps,
ExplainPlanAttributesBuilder explainPlanAttributesBuilder,
- List<List<Scan>> scansList) {
+ List<HRegionLocation> regionLocations) {
StringBuilder buf = new StringBuilder(prefix);
ScanRanges scanRanges = context.getScanRanges();
Scan scan = context.getScan();
@@ -348,7 +289,7 @@ public abstract class ExplainTable {
if (groupByLimitBytes != null) {
groupByLimit = (Integer) PInteger.INSTANCE.toObject(groupByLimitBytes);
}
- getRegionLocations(planSteps, explainPlanAttributesBuilder, scansList);
+ getRegionLocations(planSteps, explainPlanAttributesBuilder, regionLocations);
groupBy.explain(planSteps, groupByLimit, explainPlanAttributesBuilder);
if (scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX) != null) {
planSteps.add(" SERVER ARRAY ELEMENT PROJECTION");
@@ -363,13 +304,13 @@ public abstract class ExplainTable {
*
* @param planSteps list of plan steps to add explain plan output to.
* @param explainPlanAttributesBuilder explain plan v2 attributes builder instance.
- * @param scansList list of the list of scans, to be used for parallel scans.
+ * @param regionLocations region locations.
*/
private void getRegionLocations(List<String> planSteps,
ExplainPlanAttributesBuilder explainPlanAttributesBuilder,
- List<List<Scan>> scansList) {
+ List<HRegionLocation> regionLocations) {
String regionLocationPlan = getRegionLocationsForExplainPlan(explainPlanAttributesBuilder,
- scansList);
+ regionLocations);
if (regionLocationPlan.length() > 0) {
planSteps.add(regionLocationPlan);
}
@@ -381,25 +322,26 @@ public abstract class ExplainTable {
* print num of total list size.
*
* @param explainPlanAttributesBuilder explain plan v2 attributes builder instance.
- * @param scansList list of the list of scans, to be used for parallel scans.
+ * @param regionLocationsFromResultIterator region locations.
* @return region locations to be added to the explain plan output.
*/
private String getRegionLocationsForExplainPlan(
ExplainPlanAttributesBuilder explainPlanAttributesBuilder,
- List<List<Scan>> scansList) {
+ List<HRegionLocation> regionLocationsFromResultIterator) {
+ if (regionLocationsFromResultIterator == null) {
+ return "";
+ }
try {
StringBuilder buf = new StringBuilder().append(REGION_LOCATIONS);
Set<RegionBoundary> regionBoundaries = new HashSet<>();
List<HRegionLocation> regionLocations = new ArrayList<>();
- for (List<Scan> scans : scansList) {
- for (Scan eachScan : scans) {
- getRegionsInRange(tableRef.getTable().getPhysicalName().getBytes(),
- eachScan.getStartRow(),
- eachScan.getStopRow(),
- true,
- false,
- regionBoundaries,
- regionLocations);
+ for (HRegionLocation regionLocation : regionLocationsFromResultIterator) {
+ RegionBoundary regionBoundary =
+ new RegionBoundary(regionLocation.getRegion().getStartKey(),
+ regionLocation.getRegion().getEndKey());
+ if (!regionBoundaries.contains(regionBoundary)) {
+ regionLocations.add(regionLocation);
+ regionBoundaries.add(regionBoundary);
}
}
int maxLimitRegionLoc = context.getConnection().getQueryServices().getConfiguration()
@@ -421,7 +363,7 @@ public abstract class ExplainTable {
}
buf.append(") ");
return buf.toString();
- } catch (IOException | SQLException | UnsupportedOperationException e) {
+ } catch (Exception e) {
LOGGER.error("Explain table unable to add region locations.", e);
return "";
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
index e1c99e2836..c9d7147efc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.QueryPlan;
@@ -33,19 +34,22 @@ public class ParallelScansCollector {
private final List<List<Scan>> parallelScans = new ArrayList<>();
private List<Scan> lastBatch = new ArrayList<>();
private Scan lastScan = null;
+ private final List<HRegionLocation> regionLocations = new ArrayList<>();
public ParallelScansCollector(ParallelScanGrouper grouper) {
this.grouper = grouper;
parallelScans.add(lastBatch);
}
- public void addNewScan(QueryPlan plan, Scan newScan, boolean crossesRegionBoundary) {
+ public void addNewScan(QueryPlan plan, Scan newScan, boolean crossesRegionBoundary,
+ HRegionLocation regionLocation) {
if (grouper.shouldStartNewScan(plan, lastScan, newScan.getStartRow(),
- lastScanCrossedRegionBoundary)) {
+ lastScanCrossedRegionBoundary)) {
lastBatch = new ArrayList<>();
parallelScans.add(lastBatch);
}
lastBatch.add(newScan);
+ regionLocations.add(regionLocation);
lastScanCrossedRegionBoundary = crossesRegionBoundary;
lastScan = newScan;
@@ -54,4 +58,8 @@ public class ParallelScansCollector {
public List<List<Scan>> getParallelScans() {
return parallelScans;
}
+
+ public List<HRegionLocation> getRegionLocations() {
+ return regionLocations;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScansWithRegionLocations.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScansWithRegionLocations.java
new file mode 100644
index 0000000000..1bd3f45c56
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScansWithRegionLocations.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.iterate;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.util.List;
+
+/**
+ * Scan list to be retrieved for the BaseResultIterator with the list of region locations the scans
+ * would be served from.
+ */
+public class ScansWithRegionLocations {
+
+ private final List<List<Scan>> scans;
+ private final List<HRegionLocation> regionLocations;
+
+ public ScansWithRegionLocations(List<List<Scan>> scans,
+ List<HRegionLocation> regionLocations) {
+ this.scans = scans;
+ this.regionLocations = regionLocations;
+ }
+
+ public List<List<Scan>> getScans() {
+ return scans;
+ }
+
+ public List<HRegionLocation> getRegionLocations() {
+ return regionLocations;
+ }
+}