You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/07 18:32:22 UTC
phoenix git commit: PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and
disable index on compaction (addendum 2)
Repository: phoenix
Updated Branches:
refs/heads/master 814276d4b -> 64b808971
PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum 2)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/64b80897
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/64b80897
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/64b80897
Branch: refs/heads/master
Commit: 64b808971698880980d06f17b0924e6e22d95e12
Parents: 814276d
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Sep 7 11:26:47 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:26:47 2017 -0700
----------------------------------------------------------------------
.../UngroupedAggregateRegionObserver.java | 62 ++++++++++++++-
.../org/apache/phoenix/hbase/index/Indexer.java | 42 +++++-----
.../stats/DefaultStatisticsCollector.java | 83 ++++++--------------
.../schema/stats/NoOpStatisticsCollector.java | 2 +-
.../schema/stats/StatisticsCollector.java | 2 +-
5 files changed, 111 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 31c83e4..a61f502 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -55,9 +55,12 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -69,11 +72,14 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.execute.TupleProjector;
@@ -89,11 +95,13 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
@@ -899,6 +907,58 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
});
}
+ @Override
+ public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+ final StoreFile resultFile, CompactionRequest request) throws IOException {
+ // If we're compacting all files, then delete markers are removed
+ // and we must permanently disable an index that needs to be
+ // partially rebuild because we're potentially losing the information
+ // we need to successfully rebuilt it.
+ if (request.isAllFiles() || request.isMajor()) {
+ // Compaction and split upcalls run with the effective user context of the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective user is not
+ // the login user. Switch to the login user context to ensure we have the expected
+ // security context.
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ MutationCode mutationCode = null;
+ long disableIndexTimestamp = 0;
+
+ try (HTableInterface htable = e.getEnvironment().getTable(
+ SchemaUtil.getPhysicalTableName(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+ e.getEnvironment().getConfiguration()))) {
+ String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
+ // Instead, we need to disable all indexes on the view.
+ byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName);
+ Get get = new Get(tableKey);
+ get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+ Result result = htable.get(get);
+ if (!result.isEmpty()) {
+ Cell cell = result.listCells().get(0);
+ if (cell.getValueLength() > 0) {
+ disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
+ if (disableIndexTimestamp != 0) {
+ mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
+ }
+ }
+ }
+ } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction
+ logger.warn("Potential failure to permanently disable index during compaction " + e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t);
+ } finally {
+ if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
+ logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() +
+ " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode));
+ }
+ }
+ return null;
+ }
+ });
+ }
+ }
+
private static PTable deserializeTable(byte[] b) {
try {
PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -1115,7 +1175,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
long rowCount = 0;
try {
if (!compactionRunning) {
- stats.init(false);
+ stats.init();
synchronized (innerScanner) {
do {
List<Cell> results = new ArrayList<Cell>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index b76752d..ad03abb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
@@ -838,21 +840,25 @@ public class Indexer extends BaseRegionObserver {
}
@Override
- public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final InternalScanner scanner, final ScanType scanType) throws IOException {
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- InternalScanner internalScanner = scanner;
- if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ final StoreFile resultFile, CompactionRequest request) throws IOException {
+ // If we're compacting all files, then delete markers are removed
+ // and we must permanently disable an index that needs to be
+ // partially rebuild because we're potentially losing the information
+ // we need to successfully rebuilt it.
+ if (request.isAllFiles() || request.isMajor()) {
+ // Compaction and split upcalls run with the effective user context of the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective user is not
+ // the login user. Switch to the login user context to ensure we have the expected
+ // security context.
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
try {
PhoenixConnection conn = QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class);
PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ // FIXME: we may need to recurse into children of this table too
for (PTable index : table.getIndexes()) {
if (index.getIndexDisableTimestamp() != 0) {
try {
@@ -864,15 +870,15 @@ public class Indexer extends BaseRegionObserver {
}
} catch (Exception e) {
// If we can't reach the stats table, don't interrupt the normal
- // compaction operation, just log a warning.
- if (LOG.isWarnEnabled()) {
- LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
- }
+ // compaction operation, just log a warning.
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
+ }
}
+ return null;
}
- return internalScanner;
- }
- });
+ });
+ }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 61a6fa2..b8ba759 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -42,16 +41,13 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TimeKeeper;
@@ -113,10 +109,9 @@ class DefaultStatisticsCollector implements StatisticsCollector {
}
}
- private void initGuidepostDepth(boolean isMajorCompaction) throws IOException {
+ private void initGuidepostDepth() throws IOException {
// First check is if guidepost info set on statement itself
- boolean guidepostOnStatement = guidePostPerRegionBytes != null || guidePostWidthBytes != null;
- if (guidepostOnStatement) {
+ if (guidePostPerRegionBytes != null || guidePostWidthBytes != null) {
int guidepostPerRegion = 0;
long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES;
if (guidePostPerRegionBytes != null) {
@@ -127,48 +122,20 @@ class DefaultStatisticsCollector implements StatisticsCollector {
}
this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
env.getRegion().getTableDesc());
- }
-
- if (!guidepostOnStatement || isMajorCompaction) {
+ } else {
long guidepostWidth = -1;
HTableInterface htable = null;
try {
- // Next check for GUIDE_POST_WIDTH and INDEX_DISABLE_TIMESTAMP on table
- TableName htableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration());
- htable = env.getTable(htableName);
+ // Next check for GUIDE_POST_WIDTH on table
+ htable = env.getTable(
+ SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
Get get = new Get(ptableKey);
get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
Result result = htable.get(get);
if (!result.isEmpty()) {
- Cell gpwCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- if (gpwCell != null) {
- guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(gpwCell.getValueArray(), gpwCell.getValueOffset(), SortOrder.getDefault());
- }
- if (isMajorCompaction) {
- Cell idtsCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- if (idtsCell != null) {
- long indexDisableTimestamp = PLong.INSTANCE.getCodec().decodeLong(idtsCell.getValueArray(), idtsCell.getValueOffset(), SortOrder.getDefault());
- // If we have a non zero value for INDEX_DISABLE_TIMESTAMP, that means that our global mutable
- // secondary index needs to be partially rebuilt. If we're compacting, though, we may cleanup
- // the delete markers of an index *before* the puts for the same row occur during replay. At
- // this point the partially index rebuild would leave the index out of sync with the data
- // table. In that case, it's better to just permanently disable the index and force it to be
- // manually rebuilt
- if (indexDisableTimestamp != 0) {
- MutationCode mutationCode = IndexUtil.updateIndexState(ptableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
- if (mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
- LOG.warn("Attempt to permanently disable index " + env.getRegionInfo().getTable().getNameAsString() +
- " during compaction failed with code = " + mutationCode);
- }
- }
- }
- }
+ Cell cell = result.listCells().get(0);
+ guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
}
- } catch (IOException e) {
- throw e;
- } catch (Throwable t) {
- throw new IOException(t);
} finally {
if (htable != null) {
try {
@@ -178,21 +145,19 @@ class DefaultStatisticsCollector implements StatisticsCollector {
}
}
}
- if (!guidepostOnStatement) {
- if (guidepostWidth >= 0) {
- this.guidePostDepth = guidepostWidth;
- } else {
- // Last use global config value
- Configuration config = env.getConfiguration();
- this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
- config.getInt(
- QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
- config.getLong(
- QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
- env.getRegion().getTableDesc());
- }
+ if (guidepostWidth >= 0) {
+ this.guidePostDepth = guidepostWidth;
+ } else {
+ // Last use global config value
+ Configuration config = env.getConfiguration();
+ this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
+ config.getInt(
+ QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
+ config.getLong(
+ QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
+ env.getRegion().getTableDesc());
}
}
}
@@ -350,13 +315,13 @@ class DefaultStatisticsCollector implements StatisticsCollector {
StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, internalScan, family);
// We need to initialize the scanner synchronously and potentially perform a cross region Get
// in order to use the correct guide posts width for the table being compacted.
- init(true);
+ init();
return scanner;
}
@Override
- public void init(boolean isMajorCompaction) throws IOException {
- initGuidepostDepth(isMajorCompaction);
+ public void init() throws IOException {
+ initGuidepostDepth();
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
index a13a722..74d1710 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -61,7 +61,7 @@ public class NoOpStatisticsCollector implements StatisticsCollector {
}
@Override
- public void init(boolean isMajorCompaction) {
+ public void init() {
// No-op
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 9550469..60e83a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -64,7 +64,7 @@ public interface StatisticsCollector extends Closeable {
* Called before beginning the collection of statistics through {@link #collectStatistics(List)}
* @throws IOException
*/
- void init(boolean isMajorCompaction) throws IOException;
+ void init() throws IOException;
/**
* Retrieve the calculated guide post info for the given column family.