You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/09/15 08:14:15 UTC
phoenix git commit: PHOENIX-3280 Automatic attempt to rebuild all
disabled index
Repository: phoenix
Updated Branches:
refs/heads/master 27697b364 -> 2a223adfb
PHOENIX-3280 Automatic attempt to rebuild all disabled index
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a223adf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a223adf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a223adf
Branch: refs/heads/master
Commit: 2a223adfbeb32f598308da1dc6d3251ee0980d79
Parents: 27697b3
Author: James Taylor <ja...@apache.org>
Authored: Thu Sep 15 00:48:24 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Sep 15 00:48:24 2016 -0700
----------------------------------------------------------------------
.../coprocessor/MetaDataRegionObserver.java | 179 +++++++++++--------
1 file changed, 104 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a223adf/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index f1dc982..00981f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,8 +20,8 @@ package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -79,6 +79,7 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.UpgradeUtil;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
@@ -223,13 +224,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- PTable dataPTable = null;
+ Map<PTable, List<PTable>> dataTableToIndexesMap = null;
MetaDataClient client = null;
boolean hasMore = false;
List<Cell> results = new ArrayList<Cell>();
- List<PTable> indexesToPartiallyRebuild = Collections.emptyList();
scanner = this.env.getRegion().getScanner(scan);
- long earliestDisableTimestamp = Long.MAX_VALUE;
do {
results.clear();
@@ -249,19 +248,12 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
if (disabledTimeStampVal <= 0) {
continue;
}
- if (disabledTimeStampVal < earliestDisableTimestamp) {
- earliestDisableTimestamp = disabledTimeStampVal;
- }
-
byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
- byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
- if ((dataTable == null || dataTable.length == 0) || (indexStat == null || indexStat.length == 0)
- || (dataPTable != null
- && Bytes.compareTo(dataPTable.getName().getBytes(), dataTable) != 0)) {
+ if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) {
// data table name can't be empty
- // we need to build indexes of same data table. so skip other indexes for this task.
continue;
}
@@ -284,14 +276,19 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// don't run a second index populations upsert select
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class);
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
- dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
- indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
client = new MetaDataClient(conn);
+ dataTableToIndexesMap = Maps.newHashMap();
}
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+ PTable dataPTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
- PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+ PTable indexPTable = PhoenixRuntime.getTableNoCache(conn, indexTableFullName);
+ // Sanity check in case index was removed from table
+ if (!dataPTable.getIndexes().contains(indexPTable)) {
+ continue;
+ }
+
if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
LOG.debug("Index rebuild has been skipped because not all regions of index table="
+ indexPTable.getName() + " are online.");
@@ -299,82 +296,114 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
}
// Allow index to begin incremental maintenance as index is back online and we
// cannot transition directly from DISABLED -> ACTIVE
- if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) {
+ if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) {
AlterIndexStatement statement = new AlterIndexStatement(
NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
dataPTable.getTableName().getString(),
false, PIndexState.INACTIVE);
client.alterIndex(statement);
}
+ List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable);
+ if (indexesToPartiallyRebuild == null) {
+ indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
+ dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild);
+ }
indexesToPartiallyRebuild.add(indexPTable);
} while (hasMore);
- if (!indexesToPartiallyRebuild.isEmpty()) {
+ if (dataTableToIndexesMap != null) {
long overlapTime = env.getConfiguration().getLong(
- QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
- QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
- long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
-
- LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + " from timestamp=" + timeStamp);
- new Scan();
- List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
- for (PTable index : indexesToPartiallyRebuild) {
- maintainers.add(index.getIndexMaintainer(dataPTable, conn));
- }
- Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers);
- dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
- byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
- try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
- Result result;
- try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
- int batchSize = conn.getMutateBatchSize();
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
- ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
- IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn);
- byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- byte[] uuidValue = ServerCacheClient.generateId();
-
- while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
- Put put = null;
- Delete del = null;
- for (Cell cell : result.rawCells()) {
- if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
- if (put == null) {
- put = new Put(CellUtil.cloneRow(cell));
- put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
- mutations.add(put);
+ QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+ for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) {
+ PTable dataPTable = entry.getKey();
+ List<PTable> indexesToPartiallyRebuild = entry.getValue();
+ try {
+ long earliestDisableTimestamp = Long.MAX_VALUE;
+ List<IndexMaintainer> maintainers = Lists
+ .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+ for (PTable index : indexesToPartiallyRebuild) {
+ long disabledTimeStampVal = index.getIndexDisableTimestamp();
+ if (disabledTimeStampVal > 0) {
+ if (disabledTimeStampVal < earliestDisableTimestamp) {
+ earliestDisableTimestamp = disabledTimeStampVal;
+ }
+
+ maintainers.add(index.getIndexMaintainer(dataPTable, conn));
+ }
+ }
+ // No indexes are disabled, so skip this table
+ if (earliestDisableTimestamp == Long.MAX_VALUE) {
+ continue;
+ }
+
+ long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
+ LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+ + " from timestamp=" + timeStamp);
+ Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers);
+ dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
+ byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
+ try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
+ Result result;
+ try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
+ int batchSize = conn.getMutateBatchSize();
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr,
+ indexesToPartiallyRebuild, conn);
+ byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ byte[] uuidValue = ServerCacheClient.generateId();
+
+ while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : result.rawCells()) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+ PDataType.TRUE_BYTES);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+ PDataType.TRUE_BYTES);
+ mutations.add(del);
+ }
+ del.addDeleteMarker(cell);
+ }
}
- put.add(cell);
- } else {
- if (del == null) {
- del = new Delete(CellUtil.cloneRow(cell));
- del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
- mutations.add(del);
+ if (mutations.size() == batchSize) {
+ dataHTable.batch(mutations);
+ uuidValue = ServerCacheClient.generateId();
+ mutations.clear();
}
- del.addDeleteMarker(cell);
}
- }
- if (mutations.size() == batchSize) {
- dataHTable.batch(mutations);
- uuidValue = ServerCacheClient.generateId();
+ if (!mutations.isEmpty()) {
+ dataHTable.batch(mutations);
+ }
}
}
- if (!mutations.isEmpty()) {
- dataHTable.batch(mutations);
+ for (PTable indexPTable : indexesToPartiallyRebuild) {
+ AlterIndexStatement statement = new AlterIndexStatement(
+ NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable
+ .getTableName().getString()), dataPTable.getTableName().getString(),
+ false, PIndexState.ACTIVE);
+ client.alterIndex(statement);
}
+ } catch (Exception e) { // Log, but try next table's indexes
+ LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
+ + ". Will try again next on next scheduled invocation.", e);
}
}
- for (PTable indexPTable : indexesToPartiallyRebuild) {
- AlterIndexStatement statement = new AlterIndexStatement(
- NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
- dataPTable.getTableName().getString(),
- false, PIndexState.ACTIVE);
- client.alterIndex(statement);
- }
}
} catch (Throwable t) {
LOG.warn("ScheduledBuildIndexTask failed!", t);