You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/24 06:56:36 UTC
[phoenix] branch 4.14-HBase-1.4 updated: PHOENIX-5478 IndexTool
mapper task should not timeout
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
new 2c908e0 PHOENIX-5478 IndexTool mapper task should not timeout
2c908e0 is described below
commit 2c908e01f549fd3095d2d7c0fbfee87775b23982
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Oct 23 22:38:13 2019 -0700
PHOENIX-5478 IndexTool mapper task should not timeout
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 2 +-
.../coprocessor/BaseScannerRegionObserver.java | 2 +
.../UngroupedAggregateRegionObserver.java | 220 +++++++++++----------
.../apache/phoenix/index/GlobalIndexChecker.java | 4 -
.../PhoenixServerBuildIndexInputFormat.java | 2 +
.../apache/phoenix/mapreduce/index/IndexTool.java | 12 --
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 3 +-
8 files changed, 130 insertions(+), 117 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 9cc2393..6fc01bd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -81,7 +81,6 @@ import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
@Category(NeedsOwnMiniClusterTest.class)
public class IndexToolIT extends ParallelStatsEnabledIT {
-
private final boolean localIndex;
private final boolean transactional;
private final boolean directApi;
@@ -117,6 +116,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b73615f..cb4d0af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -75,6 +75,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
public static final String LOCAL_INDEX = "_LocalIndex";
public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
+ // The number of index rows to be rebuild in one RPC call
+ public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
* Needed for backward compatibility purposes. TODO: get rid of this in next major release.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0166206..3cae671 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
@@ -1034,116 +1035,137 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
throw new RuntimeException(e);
}
}
-
- private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
- Configuration config) throws IOException {
- byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
- boolean useProto = true;
- // for backward compatibility fall back to look up by the old attribute
- if (indexMetaData == null) {
- useProto = false;
- indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+
+ private class IndexRebuildRegionScanner extends BaseRegionScanner {
+ private long pageSizeInRows = Long.MAX_VALUE;
+ private boolean hasMore;
+ private final int maxBatchSize;
+ private MutationList mutations;
+ private final long maxBatchSizeBytes;
+ private final long blockingMemstoreSize;
+ private final byte[] clientVersionBytes;
+ private List<Cell> results = new ArrayList<Cell>();
+ private byte[] indexMetaData;
+ private boolean useProto = true;
+ private Scan scan;
+ private RegionScanner innerScanner;
+ final Region region;
+
+ IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+ final Configuration config) {
+ super(innerScanner);
+ if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+ pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ }
+
+ maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ mutations = new MutationList(maxBatchSize);
+ maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ blockingMemstoreSize = getBlockingMemstoreSize(region, config);
+ clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ if (indexMetaData == null) {
+ useProto = false;
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
+ this.scan = scan;
+ this.innerScanner = innerScanner;
+ this.region = region;
}
- byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
- boolean hasMore;
- int rowCount = 0;
- try {
- int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
- final long blockingMemstoreSize = getBlockingMemstoreSize(region, config);
- MutationList mutations = new MutationList(maxBatchSize);
- region.startRegionOperation();
- byte[] uuidValue = ServerCacheClient.generateId();
- synchronized (innerScanner) {
- do {
- List<Cell> results = new ArrayList<Cell>();
- hasMore = innerScanner.nextRaw(results);
- if (!results.isEmpty()) {
- Put put = null;
- Delete del = null;
- for (Cell cell : results) {
-
- if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
- if (put == null) {
- put = new Put(CellUtil.cloneRow(cell));
- put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
- put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
- BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
- put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- mutations.add(put);
- // Since we're replaying existing mutations, it makes no sense to write them to the wal
- put.setDurability(Durability.SKIP_WAL);
- }
- put.add(cell);
- } else {
- if (del == null) {
- del = new Delete(CellUtil.cloneRow(cell));
- del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
- del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
- BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
- del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- mutations.add(del);
- // Since we're replaying existing mutations, it makes no sense to write them to the wal
- del.setDurability(Durability.SKIP_WAL);
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() { return hasMore; }
+
+ @Override
+ public void close() throws IOException { innerScanner.close(); }
+
+ private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+ m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+ m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+ // Since we're replaying existing mutations, it makes no sense to write them to the wal
+ m.setDurability(Durability.SKIP_WAL);
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ int rowCount = 0;
+ try {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ synchronized (innerScanner) {
+ do {
+ List<Cell> row = new ArrayList<Cell>();
+ hasMore = innerScanner.nextRaw(row);
+ if (!row.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : row) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ setMutationAttributes(put, uuidValue);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ setMutationAttributes(del, uuidValue);
+ mutations.add(del);
+ }
+ del.addDeleteMarker(cell);
}
- del.addDeleteMarker(cell);
}
+ if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ checkForRegionClosingOrSplitting();
+ commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ uuidValue = ServerCacheClient.generateId();
+ mutations.clear();
+ }
+ rowCount++;
}
- if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- checkForRegionClosingOrSplitting();
- commitBatchWithRetries(region, mutations, blockingMemstoreSize);
- uuidValue = ServerCacheClient.generateId();
- mutations.clear();
- }
- rowCount++;
+
+ } while (hasMore && rowCount < pageSizeInRows);
+ if (!mutations.isEmpty()) {
+ checkForRegionClosingOrSplitting();
+ commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}
-
- } while (hasMore);
- if (!mutations.isEmpty()) {
- checkForRegionClosingOrSplitting();
- commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ }
+ } catch (IOException e) {
+ hasMore = false;
+ LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+ throw e;
+ } finally {
+ if (!hasMore) {
+ region.closeRegionOperation();
}
}
- } catch (IOException e) {
- LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
- throw e;
- } finally {
- region.closeRegionOperation();
+ byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+ final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+ results.add(aggKeyValue);
+ return hasMore;
}
- byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
- final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
- SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
-
- RegionScanner scanner = new BaseRegionScanner(innerScanner) {
- @Override
- public HRegionInfo getRegionInfo() {
- return region.getRegionInfo();
- }
-
- @Override
- public boolean isFilterDone() {
- return true;
- }
- @Override
- public void close() throws IOException {
- innerScanner.close();
- }
+ @Override
+ public long getMaxResultSize() {
+ return scan.getMaxResultSize();
+ }
+ }
- @Override
- public boolean next(List<Cell> results) throws IOException {
- results.add(aggKeyValue);
- return false;
- }
+ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
+ final Configuration config) throws IOException {
- @Override
- public long getMaxResultSize() {
- return scan.getMaxResultSize();
- }
- };
+ region.startRegionOperation();
+ RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config);
return scanner;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 21c2e69..7848047 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -253,10 +253,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
return result.getRow();
}
};
- for (Cell cell : result.rawCells()) {
- String cellString = cell.toString();
- LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
- }
byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
indexRowKey, 0, indexRowKey.length) != 0) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 7bd30e6..1feee81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
/**
* {@link InputFormat} implementation from Phoenix for building index
@@ -89,6 +90,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
try {
scan.setTimeRange(0, scn);
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
} catch (IOException e) {
throw new SQLException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 4bc57b5..5f43f02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -481,17 +481,6 @@ public class IndexTool extends Configured implements Tool {
private Job configureJobForServerBuildIndex()
throws Exception {
- long indexRebuildQueryTimeoutMs =
- configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
- QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
- // Set various phoenix and hbase level timeouts and rpc retries
- configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
- Long.toString(indexRebuildQueryTimeoutMs));
- configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
- Long.toString(indexRebuildQueryTimeoutMs));
- configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY,
- Long.toString(indexRebuildQueryTimeoutMs));
-
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
@@ -506,7 +495,6 @@ public class IndexTool extends Configured implements Tool {
fs = outputPath.getFileSystem(configuration);
fs.delete(outputPath, true);
- configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
final Job job = Job.getInstance(configuration, jobName);
job.setJarByClass(IndexTool.class);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 6ad397f..9c87ecb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -322,6 +322,8 @@ public interface QueryServices extends SQLCloseable {
public static final String GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.global.index.row.age.threshold.to.delete.ms";
// Enable the IndexRegionObserver Coprocessor
public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled";
+ // The number of index rows to be rebuild in one RPC call
+ public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
/**
* Get executor service used for parallel scans
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 923403b..afb7153 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -200,7 +200,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins
// 30 min rpc timeout * 5 tries, with 2100ms total pause time between retries
- public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = 60000 * 60 * 24; // 24 hrs
+ public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100;
public static final long DEFAULT_INDEX_REBUILD_RPC_TIMEOUT = 30000 * 60; // 30 mins
public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level
@@ -342,6 +342,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
+ public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 1024*1024;
public static final boolean DEFAULT_PROPERTY_POLICY_PROVIDER_ENABLED = true;