You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2018/02/23 00:46:52 UTC
phoenix git commit: PHOENIX-4530 Do not collect delete markers during
major compaction of table with disabled mutable indexes
Repository: phoenix
Updated Branches:
refs/heads/master f355be008 -> 0fca6ca21
PHOENIX-4530 Do not collect delete markers during major compaction of table with disabled mutable indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0fca6ca2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0fca6ca2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0fca6ca2
Branch: refs/heads/master
Commit: 0fca6ca215e3f585069c9df23a7ec5c8b64e8e77
Parents: f355be0
Author: Vincent Poon <vi...@apache.org>
Authored: Tue Feb 20 18:14:50 2018 -0800
Committer: Vincent Poon <vi...@apache.org>
Committed: Thu Feb 22 16:46:45 2018 -0800
----------------------------------------------------------------------
.../PartialScannerResultsDisabledIT.java | 2 +-
.../UngroupedAggregateRegionObserverIT.java | 171 -------------------
.../phoenix/end2end/index/MutableIndexIT.java | 55 ++++++
.../end2end/index/PartialIndexRebuilderIT.java | 39 -----
.../UngroupedAggregateRegionObserver.java | 121 +++++--------
.../java/org/apache/phoenix/util/TestUtil.java | 19 +++
6 files changed, 116 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fca6ca2/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
index 817b0bd..59471dd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
@@ -151,7 +151,7 @@ public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
return RandomStringUtils.randomAlphabetic(length);
}
- private void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception {
+ public static void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception {
for (int j = 0; j < numBatches; j++) {
try (PreparedStatement statement =
connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fca6ca2/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
deleted file mode 100644
index 0ae1bb5..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.never;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.spi.LoggingEvent;
-import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class UngroupedAggregateRegionObserverIT extends ParallelStatsDisabledIT {
-
- private String dataTableName;
- private String indexTableName;
- private String schemaName;
- private String dataTableFullName;
- private static String indexTableFullName;
-
- @Mock
- private Appender mockAppender;
-
- @Captor
- private ArgumentCaptor<LoggingEvent> captorLoggingEvent;
- private UngroupedAggregateRegionObserver ungroupedObserver;
-
- @Before
- public void setup() {
- ungroupedObserver = new UngroupedAggregateRegionObserver();
- ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config));
- }
-
- /**
- * Tests the that post compact hook doesn't log any NPE for a System table
- */
- @Test
- public void testPostCompactSystemSequence() throws Exception {
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- startCapturingIndexLog();
- // run the post-compact hook
- ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE");
- stopCapturingIndexLog();
- // uneventful - nothing should be logged
- Mockito.verify(mockAppender, never())
- .doAppend(captorLoggingEvent.capture());
- }
- }
-
- /**
- * Tests that calling the post compact hook on the data table permanently disables an index that
- * is being rebuilt (i.e. already disabled or inactive)
- */
- @Test
- public void testPostCompactDataTableDuringRebuild() throws Exception {
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- generateUniqueTableNames();
- testRebuildPostCompact(conn, dataTableFullName);
- }
- }
-
- /**
- * Tests that calling the post compact hook on the index table permanently disables an index
- * that is being rebuilt (i.e. already disabled or inactive)
- */
- @Test
- public void testPostCompactIndexTableDuringRebuild() throws Exception {
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- generateUniqueTableNames();
- testRebuildPostCompact(conn, indexTableFullName);
- }
- }
-
- private void testRebuildPostCompact(Connection conn, String tableToCompact)
- throws SQLException {
- conn.createStatement().execute(
- String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName));
- conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL,
- indexTableName, dataTableFullName));
- // disable the index, simulating an index write failure
- PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
- IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE,
- EnvironmentEdgeManager.currentTimeMillis());
-
- // run the post-compact hook on the data table
- startCapturingIndexLog();
- ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact);
- stopCapturingIndexLog();
- // an event should've been logged
- Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture());
- LoggingEvent loggingEvent = captorLoggingEvent.getValue();
- assertThat(loggingEvent.getLevel(), is(Level.INFO));
- // index should be permanently disabled (disabletime of 0)
- assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, PIndexState.DISABLE, 0L));
- }
-
- /**
- * Tests that a non-Phoenix table (created purely through HBase) doesn't log a warning in
- * postCompact
- */
- @Test
- public void testPostCompactTableNotFound() throws Exception {
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- HBaseTestingUtility utility = getUtility();
- String nonPhoenixTable = "NOT_A_PHOENIX_TABLE";
- utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable));
- startCapturingIndexLog();
- ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable);
- stopCapturingIndexLog();
- // a debug level event should've been logged
- Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture());
- LoggingEvent loggingEvent = captorLoggingEvent.getValue();
- assertThat(loggingEvent.getLevel(), is(Level.DEBUG));
- }
- }
-
- private void stopCapturingIndexLog() {
- LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender);
- }
-
- private void startCapturingIndexLog() {
- LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender);
- }
-
- private void generateUniqueTableNames() {
- schemaName = generateUniqueName();
- dataTableName = generateUniqueName() + "_DATA";
- dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- indexTableName = generateUniqueName() + "_IDX";
- indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fca6ca2/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index e46a213..efae15e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -42,16 +42,23 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -812,6 +819,54 @@ public class MutableIndexIT extends ParallelStatsDisabledIT {
}
}
+ // Tests that if major compaction is run on a table with a disabled index,
+ // deleted cells are kept
+ @Test
+ public void testCompactDisabledIndex() throws Exception {
+ try (Connection conn = getConnection()) {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName() + "_DATA";
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName() + "_IDX";
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ conn.createStatement().execute(
+ String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName));
+ conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL,
+ indexTableName, dataTableFullName));
+
+ //insert a row, and delete it
+ PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1, dataTableFullName);
+ conn.createStatement().execute("DELETE FROM " + dataTableFullName);
+ conn.commit();
+
+ // disable the index, simulating an index write failure
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE,
+ EnvironmentEdgeManager.currentTimeMillis());
+
+ // major compaction should not remove the deleted row
+ List<HRegion> regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName));
+ HRegion hRegion = regions.get(0);
+ hRegion.flush(true);
+ HStore store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+ store.triggerMajorCompaction();
+ store.compactRecentForTestingAssumingDefaultPolicy(1);
+ HTableInterface dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
+ assertEquals(1, TestUtil.getRawRowCount(dataHTI));
+
+ // reenable the index
+ IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.INACTIVE,
+ EnvironmentEdgeManager.currentTimeMillis());
+ IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.ACTIVE, 0L);
+
+ // now major compaction should remove the deleted row
+ store.triggerMajorCompaction();
+ store.compactRecentForTestingAssumingDefaultPolicy(1);
+ dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
+ assertEquals(0, TestUtil.getRawRowCount(dataHTI));
+ }
+ }
+
private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException {
PreparedStatement stmt = tenantConn.prepareStatement(dml);
stmt.setString(1, "00000000000000" + String.valueOf(i));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fca6ca2/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 3961d32..46443e3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -318,45 +318,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
return hasInactiveIndex;
}
-
- @Test
- public void testCompactionDuringRebuild() throws Throwable {
- String schemaName = generateUniqueName();
- String tableName = generateUniqueName();
- String indexName1 = generateUniqueName();
- String indexName2 = generateUniqueName();
- final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
- String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1);
- String fullIndexName2 = SchemaUtil.getTableName(schemaName, indexName2);
- final MyClock clock = new MyClock(1000);
- // Use our own clock to prevent race between partial rebuilder and compaction
- EnvironmentEdgeManager.injectEdge(clock);
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, GUIDE_POSTS_WIDTH=1000");
- clock.time += 100;
- conn.createStatement().execute("CREATE INDEX " + indexName1 + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- clock.time += 100;
- conn.createStatement().execute("CREATE INDEX " + indexName2 + " ON " + fullTableName + " (v2) INCLUDE (v1)");
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(1, 2, 3)");
- conn.commit();
- clock.time += 100;
- long disableTS = EnvironmentEdgeManager.currentTimeMillis();
- HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
- IndexUtil.updateIndexState(fullIndexName1, disableTS, metaTable, PIndexState.DISABLE);
- IndexUtil.updateIndexState(fullIndexName2, disableTS, metaTable, PIndexState.DISABLE);
- clock.time += 100;
- TestUtil.doMajorCompaction(conn, fullIndexName1);
- clock.time += 100;
- assertTrue(TestUtil.checkIndexState(conn, fullIndexName1, PIndexState.DISABLE, 0L));
- assertFalse(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L));
- TestUtil.doMajorCompaction(conn, fullTableName);
- clock.time += 100;
- assertTrue(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L));
- } finally {
- EnvironmentEdgeManager.injectEdge(null);
- }
- }
@Test
@Repeat(5)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fca6ca2/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 93b42bc..6108aca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -65,11 +65,12 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,14 +103,12 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -144,7 +143,6 @@ import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -986,82 +984,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
});
}
- @Override
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final StoreFile resultFile, CompactionRequest request) throws IOException {
- // If we're compacting all files, then delete markers are removed
- // and we must permanently disable an index that needs to be
- // partially rebuild because we're potentially losing the information
- // we need to successfully rebuilt it.
- if (request.isAllFiles() || request.isMajor()) {
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
- clearTsOnDisabledIndexes(fullTableName);
- return null;
- }
- });
- }
- }
-
- @VisibleForTesting
- public void clearTsOnDisabledIndexes(final String fullTableName) {
- try (PhoenixConnection conn =
- QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
- String baseTable = fullTableName;
- PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
- List<PTable> indexes;
- // if it's an index table, we just need to check if it's disabled
- if (PTableType.INDEX.equals(table.getType())) {
- indexes = Lists.newArrayList(table.getIndexes());
- indexes.add(table);
- } else {
- // for a data table, check all its indexes
- indexes = table.getIndexes();
- }
- // FIXME need handle views and indexes on views as well
- // if any index is disabled, we won't have all the data for a rebuild after compaction
- for (PTable index : indexes) {
- if (index.getIndexDisableTimestamp() != 0) {
- try {
- logger.info(
- "Major compaction running while index on table is disabled. Clearing index disable timestamp: "
- + index);
- IndexUtil.updateIndexState(conn, index.getName().getString(),
- PIndexState.DISABLE, Long.valueOf(0L));
- } catch (SQLException e) {
- logger.warn(
- "Unable to permanently disable index " + index.getName().getString(),
- e);
- }
- }
- }
- } catch (Exception e) {
- if (e instanceof TableNotFoundException) {
- logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName);
- // non-Phoenix HBase tables won't be found, do nothing
- return;
- }
- // If we can't reach the stats table, don't interrupt the normal
- // compaction operation, just log a warning.
- if (logger.isWarnEnabled()) {
- logger.warn("Unable to permanently disable indexes being partially rebuild for "
- + fullTableName,
- e);
- }
- }
- }
-
- @VisibleForTesting
- public void setCompactionConfig(Configuration compactionConfig) {
- this.compactionConfig = compactionConfig;
- }
-
private static PTable deserializeTable(byte[] b) {
try {
PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -1422,4 +1344,43 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
protected boolean isRegionObserverFor(Scan scan) {
return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null;
}
+
+ @Override
+ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final List<? extends KeyValueScanner> scanners, ScanType scanType,
+ long earliestPutTs, final InternalScanner s, final CompactionRequest request) throws IOException {
+ // Compaction and split upcalls run with the effective user context of the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective user is not
+ // the login user. Switch to the login user context to ensure we have the expected
+ // security context.
+ return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
+ @Override
+ public InternalScanner run() throws Exception {
+ // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
+ if (request.isMajor()) {
+ String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ try (PhoenixConnection conn =
+ QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+ String baseTable = fullTableName;
+ PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
+ List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes();
+ // FIXME need to handle views and indexes on views as well
+ for (PTable index : indexes) {
+ if (index.getIndexDisableTimestamp() != 0) {
+ logger.info(
+ "Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
+ + baseTable);
+ Scan scan = new Scan();
+ scan.setMaxVersions();
+ return new StoreScanner(store, store.getScanInfo(), scan, scanners,
+ ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
+ HConstants.OLDEST_TIMESTAMP);
+ }
+ }
+ }
+ }
+ return s;
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0fca6ca2/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 4a105f6..d50589f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -853,6 +853,25 @@ public class TestUtil {
System.out.println("-----------------------------------------------");
}
+ public static int getRawRowCount(HTableInterface table) throws IOException {
+ Scan s = new Scan();
+ s.setRaw(true);;
+ s.setMaxVersions();
+ int rows = 0;
+ try (ResultScanner scanner = table.getScanner(s)) {
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ rows++;
+ CellScanner cellScanner = result.cellScanner();
+ Cell current = null;
+ while (cellScanner.advance()) {
+ current = cellScanner.current();
+ }
+ }
+ }
+ return rows;
+ }
+
public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException {
try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
System.out.println("************ dumping index status for " + indexName + " **************");