You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2017/10/19 21:28:48 UTC
phoenix git commit: PHOENIX-4242 Fix Indexer post-compact hook
logging of NPE and TableNotFound
Repository: phoenix
Updated Branches:
refs/heads/master 0461fe855 -> 53910f9f8
PHOENIX-4242 Fix Indexer post-compact hook logging of NPE and TableNotFound
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53910f9f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53910f9f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53910f9f
Branch: refs/heads/master
Commit: 53910f9f88cd47f51f68691042306c89118b6ab3
Parents: 0461fe8
Author: Vincent Poon <vi...@apache.org>
Authored: Thu Oct 19 14:28:27 2017 -0700
Committer: Vincent Poon <vi...@apache.org>
Committed: Thu Oct 19 14:28:27 2017 -0700
----------------------------------------------------------------------
.../UngroupedAggregateRegionObserverIT.java | 171 +++++++++++++++++++
.../UngroupedAggregateRegionObserver.java | 103 ++++++-----
.../org/apache/phoenix/hbase/index/Indexer.java | 52 ------
.../apache/phoenix/schema/MetaDataClient.java | 3 +
4 files changed, 239 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/53910f9f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
new file mode 100644
index 0000000..3efd40e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class UngroupedAggregateRegionObserverIT extends ParallelStatsDisabledIT {
+
+ private String dataTableName;
+ private String indexTableName;
+ private String schemaName;
+ private String dataTableFullName;
+ private static String indexTableFullName;
+
+ @Mock
+ private Appender mockAppender;
+
+ @Captor
+ private ArgumentCaptor<LoggingEvent> captorLoggingEvent;
+ private UngroupedAggregateRegionObserver ungroupedObserver;
+
+ @Before
+ public void setup() {
+ ungroupedObserver = new UngroupedAggregateRegionObserver();
+ ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config));
+ }
+
+ /**
+ * Tests the that post compact hook doesn't log any NPE for a System table
+ */
+ @Test
+ public void testPostCompactSystemSequence() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ startCapturingIndexLog();
+ // run the post-compact hook
+ ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE");
+ stopCapturingIndexLog();
+ // uneventful - nothing should be logged
+ Mockito.verify(mockAppender, never())
+ .doAppend((LoggingEvent) captorLoggingEvent.capture());
+ }
+ }
+
+ /**
+ * Tests that calling the post compact hook on the data table permanently disables an index that
+ * is being rebuilt (i.e. already disabled or inactive)
+ */
+ @Test
+ public void testPostCompactDataTableDuringRebuild() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ generateUniqueTableNames();
+ testRebuildPostCompact(conn, dataTableFullName);
+ }
+ }
+
+ /**
+ * Tests that calling the post compact hook on the index table permanently disables an index
+ * that is being rebuilt (i.e. already disabled or inactive)
+ */
+ @Test
+ public void testPostCompactIndexTableDuringRebuild() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ generateUniqueTableNames();
+ testRebuildPostCompact(conn, indexTableFullName);
+ }
+ }
+
+ private void testRebuildPostCompact(Connection conn, String tableToCompact)
+ throws SQLException {
+ conn.createStatement().execute(
+ String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName));
+ conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL,
+ indexTableName, dataTableFullName));
+ // disable the index, simulating an index write failure
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE,
+ EnvironmentEdgeManager.currentTimeMillis());
+
+ // run the post-compact hook on the data table
+ startCapturingIndexLog();
+ ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact);
+ stopCapturingIndexLog();
+ // an event should've been logged
+ Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture());
+ LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue();
+ assertThat(loggingEvent.getLevel(), is(Level.INFO));
+ // index should be permanently disabled (disabletime of 0)
+ assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, PIndexState.DISABLE, 0L));
+ }
+
+ /**
+ * Tests that a non-Phoenix table (created purely through HBase) doesn't log a warning in
+ * postCompact
+ */
+ @Test
+ public void testPostCompactTableNotFound() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ HBaseTestingUtility utility = getUtility();
+ String nonPhoenixTable = "NOT_A_PHOENIX_TABLE";
+ utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable));
+ startCapturingIndexLog();
+ ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable);
+ stopCapturingIndexLog();
+ // a debug level event should've been logged
+ Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture());
+ LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue();
+ assertThat(loggingEvent.getLevel(), is(Level.DEBUG));
+ }
+ }
+
+ private void stopCapturingIndexLog() {
+ LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender);
+ }
+
+ private void startCapturingIndexLog() {
+ LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender);
+ }
+
+ private void generateUniqueTableNames() {
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName() + "_DATA";
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = generateUniqueName() + "_IDX";
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/53910f9f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c3024a7..af50420 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
@@ -108,8 +109,10 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -133,7 +136,10 @@ import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -142,7 +148,10 @@ import org.apache.phoenix.util.TimeKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
@@ -926,7 +935,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
@Override
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+ public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final StoreFile resultFile, CompactionRequest request) throws IOException {
// If we're compacting all files, then delete markers are removed
// and we must permanently disable an index that needs to be
@@ -940,49 +949,67 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- MutationCode mutationCode = null;
- long disableIndexTimestamp = 0;
-
- try (CoprocessorHConnection coprocessorHConnection =
- new CoprocessorHConnection(compactionConfig,
- (HRegionServer) e.getEnvironment()
- .getRegionServerServices());
- HTableInterface htable =
- coprocessorHConnection
- .getTable(SchemaUtil.getPhysicalTableName(
- PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
- compactionConfig))) {
- String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
- // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
- // Instead, we need to disable all indexes on the view.
- byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName);
- Get get = new Get(tableKey);
- get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- Result result = htable.get(get);
- if (!result.isEmpty()) {
- Cell cell = result.listCells().get(0);
- if (cell.getValueLength() > 0) {
- disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
- if (disableIndexTimestamp != 0) {
- logger.info("Major compaction running while index on table is disabled. Clearing index disable timestamp: " + tableName);
- mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
- }
- }
- }
- } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction
- logger.warn("Potential failure to permanently disable index during compaction " + e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t);
- } finally {
- if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
- logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() +
- " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode));
- }
- }
+ String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ clearTsOnDisabledIndexes(fullTableName);
return null;
}
});
}
}
+ @VisibleForTesting
+ public void clearTsOnDisabledIndexes(final String fullTableName) {
+ try (PhoenixConnection conn =
+ QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+ String baseTable = fullTableName;
+ PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
+ List<PTable> indexes;
+ // if it's an index table, we just need to check if it's disabled
+ if (PTableType.INDEX.equals(table.getType())) {
+ indexes = Lists.newArrayList(table.getIndexes());
+ indexes.add(table);
+ } else {
+ // for a data table, check all its indexes
+ indexes = table.getIndexes();
+ }
+ // FIXME need handle views and indexes on views as well
+ // if any index is disabled, we won't have all the data for a rebuild after compaction
+ for (PTable index : indexes) {
+ if (index.getIndexDisableTimestamp() != 0) {
+ try {
+ logger.info(
+ "Major compaction running while index on table is disabled. Clearing index disable timestamp: "
+ + index);
+ IndexUtil.updateIndexState(conn, index.getName().getString(),
+ PIndexState.DISABLE, Long.valueOf(0L));
+ } catch (SQLException e) {
+ logger.warn(
+ "Unable to permanently disable index " + index.getName().getString(),
+ e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ if (e instanceof TableNotFoundException) {
+ logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName);
+ // non-Phoenix HBase tables won't be found, do nothing
+ return;
+ }
+ // If we can't reach the stats table, don't interrupt the normal
+ // compaction operation, just log a warning.
+ if (logger.isWarnEnabled()) {
+ logger.warn("Unable to permanently disable indexes being partially rebuild for "
+ + fullTableName,
+ e);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void setCompactionConfig(Configuration compactionConfig) {
+ this.compactionConfig = compactionConfig;
+ }
+
private static PTable deserializeTable(byte[] b) {
try {
PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/53910f9f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 8957b30..24eeab5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -25,7 +25,6 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -63,8 +62,6 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
@@ -90,18 +87,12 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.collect.Lists;
@@ -840,48 +831,5 @@ public class Indexer extends BaseRegionObserver {
properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName());
desc.addCoprocessor(Indexer.class.getName(), null, priority, properties);
}
-
- @Override
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final StoreFile resultFile, CompactionRequest request) throws IOException {
- // If we're compacting all files, then delete markers are removed
- // and we must permanently disable an index that needs to be
- // partially rebuild because we're potentially losing the information
- // we need to successfully rebuilt it.
- if (request.isAllFiles() || request.isMajor()) {
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
- try {
- PhoenixConnection conn = QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class);
- PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
- // FIXME: we may need to recurse into children of this table too
- for (PTable index : table.getIndexes()) {
- if (index.getIndexDisableTimestamp() != 0) {
- try {
- LOG.info("Major compaction running while index on table is disabled. Clearing index disable timestamp: " + fullTableName);
- IndexUtil.updateIndexState(conn, index.getName().getString(), PIndexState.DISABLE, Long.valueOf(0L));
- } catch (SQLException e) {
- LOG.warn("Unable to permanently disable index " + index.getName().getString(), e);
- }
- }
- }
- } catch (Exception e) {
- // If we can't reach the stats table, don't interrupt the normal
- // compaction operation, just log a warning.
- if (LOG.isWarnEnabled()) {
- LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
- }
- }
- return null;
- }
- });
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/53910f9f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0f6bab2..0ce4246 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -602,6 +602,9 @@ public class MetaDataClient {
}
if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
+ if (result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS && result.getTable() == null) {
+ result.setTable(table);
+ }
return result;
}
MutationCode code = result.getMutationCode();