You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:34:44 UTC

[16/40] phoenix git commit: PHOENIX-4242 Fix Indexer post-compact hook logging of NPE and TableNotFound

PHOENIX-4242 Fix Indexer post-compact hook logging of NPE and TableNotFound


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/632cbba8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/632cbba8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/632cbba8

Branch: refs/heads/4.x-HBase-1.2
Commit: 632cbba8914ed541c89ba4930b1b549709a237b9
Parents: 3b536f4
Author: Vincent Poon <vi...@apache.org>
Authored: Thu Oct 19 14:28:27 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:02:13 2017 -0800

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserverIT.java     | 171 +++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java       | 103 ++++++-----
 .../org/apache/phoenix/hbase/index/Indexer.java |  52 ------
 .../apache/phoenix/schema/MetaDataClient.java   |   3 +
 4 files changed, 239 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/632cbba8/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
new file mode 100644
index 0000000..3efd40e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class UngroupedAggregateRegionObserverIT extends ParallelStatsDisabledIT {
+
+    private String dataTableName;
+    private String indexTableName;
+    private String schemaName;
+    private String dataTableFullName;
+    private static String indexTableFullName;
+
+    @Mock
+    private Appender mockAppender;
+
+    @Captor
+    private ArgumentCaptor<LoggingEvent> captorLoggingEvent;
+    private UngroupedAggregateRegionObserver ungroupedObserver;
+
+    @Before
+    public void setup() {
+        ungroupedObserver = new UngroupedAggregateRegionObserver();
+        ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config));
+    }
+
+    /**
+     * Tests the that post compact hook doesn't log any NPE for a System table
+     */
+    @Test
+    public void testPostCompactSystemSequence() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            startCapturingIndexLog();
+            // run the post-compact hook
+            ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE");
+            stopCapturingIndexLog();
+            // uneventful - nothing should be logged
+            Mockito.verify(mockAppender, never())
+                    .doAppend((LoggingEvent) captorLoggingEvent.capture());
+        }
+    }
+
+    /**
+     * Tests that calling the post compact hook on the data table permanently disables an index that
+     * is being rebuilt (i.e. already disabled or inactive)
+     */
+    @Test
+    public void testPostCompactDataTableDuringRebuild() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            generateUniqueTableNames();
+            testRebuildPostCompact(conn, dataTableFullName);
+        }
+    }
+
+    /**
+     * Tests that calling the post compact hook on the index table permanently disables an index
+     * that is being rebuilt (i.e. already disabled or inactive)
+     */
+    @Test
+    public void testPostCompactIndexTableDuringRebuild() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            generateUniqueTableNames();
+            testRebuildPostCompact(conn, indexTableFullName);
+        }
+    }
+
+    private void testRebuildPostCompact(Connection conn, String tableToCompact)
+            throws SQLException {
+        conn.createStatement().execute(
+            String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName));
+        conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL,
+            indexTableName, dataTableFullName));
+        // disable the index, simulating an index write failure
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE,
+            EnvironmentEdgeManager.currentTimeMillis());
+
+        // run the post-compact hook on the data table
+        startCapturingIndexLog();
+        ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact);
+        stopCapturingIndexLog();
+        // an event should've been logged
+        Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture());
+        LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue();
+        assertThat(loggingEvent.getLevel(), is(Level.INFO));
+        // index should be permanently disabled (disabletime of 0)
+        assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, PIndexState.DISABLE, 0L));
+    }
+
+    /**
+     * Tests that a non-Phoenix table (created purely through HBase) doesn't log a warning in
+     * postCompact
+     */
+    @Test
+    public void testPostCompactTableNotFound() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            HBaseTestingUtility utility = getUtility();
+            String nonPhoenixTable = "NOT_A_PHOENIX_TABLE";
+            utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable));
+            startCapturingIndexLog();
+            ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable);
+            stopCapturingIndexLog();
+            // a debug level event should've been logged
+            Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture());
+            LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue();
+            assertThat(loggingEvent.getLevel(), is(Level.DEBUG));
+        }
+    }
+
+    private void stopCapturingIndexLog() {
+        LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender);
+    }
+
+    private void startCapturingIndexLog() {
+        LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender);
+    }
+
+    private void generateUniqueTableNames() {
+        schemaName = generateUniqueName();
+        dataTableName = generateUniqueName() + "_DATA";
+        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        indexTableName = generateUniqueName() + "_IDX";
+        indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/632cbba8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c3024a7..af50420 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
@@ -108,8 +109,10 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -133,7 +136,10 @@ import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -142,7 +148,10 @@ import org.apache.phoenix.util.TimeKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
@@ -926,7 +935,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     }
 
     @Override
-    public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+    public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
             final StoreFile resultFile, CompactionRequest request) throws IOException {
         // If we're compacting all files, then delete markers are removed
         // and we must permanently disable an index that needs to be
@@ -940,49 +949,67 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
                 @Override
                 public Void run() throws Exception {
-                    MutationCode mutationCode = null;
-                    long disableIndexTimestamp = 0;
-
-                    try (CoprocessorHConnection coprocessorHConnection =
-                            new CoprocessorHConnection(compactionConfig,
-                                    (HRegionServer) e.getEnvironment()
-                                            .getRegionServerServices());
-                            HTableInterface htable =
-                                    coprocessorHConnection
-                                            .getTable(SchemaUtil.getPhysicalTableName(
-                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
-                                                compactionConfig))) {
-                        String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-                        // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
-                        // Instead, we need to disable all indexes on the view.
-                        byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName);
-                        Get get = new Get(tableKey);
-                        get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
-                        Result result = htable.get(get);
-                        if (!result.isEmpty()) {
-                            Cell cell = result.listCells().get(0);
-                            if (cell.getValueLength() > 0) {
-                                disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
-                                if (disableIndexTimestamp != 0) {
-                                    logger.info("Major compaction running while index on table is disabled.  Clearing index disable timestamp: " + tableName);
-                                    mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
-                                }
-                            }
-                        }
-                    } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction
-                        logger.warn("Potential failure to permanently disable index during compaction " +  e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t);
-                    } finally {
-                        if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
-                            logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() + 
-                                    " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode));
-                        }
-                    }
+                    String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+                    clearTsOnDisabledIndexes(fullTableName);
                     return null;
                 }
             });
         }
     }
 
+    @VisibleForTesting
+    public void clearTsOnDisabledIndexes(final String fullTableName) {
+        try (PhoenixConnection conn =
+                QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+            String baseTable = fullTableName;
+            PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
+            List<PTable> indexes;
+            // if it's an index table, we just need to check if it's disabled
+            if (PTableType.INDEX.equals(table.getType())) {
+                indexes = Lists.newArrayList(table.getIndexes());
+                indexes.add(table);
+            } else {
+                // for a data table, check all its indexes
+                indexes = table.getIndexes();
+            }
+            // FIXME need handle views and indexes on views as well
+            // if any index is disabled, we won't have all the data for a rebuild after compaction
+            for (PTable index : indexes) {
+                if (index.getIndexDisableTimestamp() != 0) {
+                    try {
+                        logger.info(
+                            "Major compaction running while index on table is disabled.  Clearing index disable timestamp: "
+                                    + index);
+                        IndexUtil.updateIndexState(conn, index.getName().getString(),
+                            PIndexState.DISABLE, Long.valueOf(0L));
+                    } catch (SQLException e) {
+                        logger.warn(
+                            "Unable to permanently disable index " + index.getName().getString(),
+                            e);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            if (e instanceof TableNotFoundException) {
+                logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName);
+                // non-Phoenix HBase tables won't be found, do nothing
+                return;
+            }
+            // If we can't reach the stats table, don't interrupt the normal
+            // compaction operation, just log a warning.
+            if (logger.isWarnEnabled()) {
+                logger.warn("Unable to permanently disable indexes being partially rebuild for "
+                        + fullTableName,
+                    e);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public void setCompactionConfig(Configuration compactionConfig) {
+        this.compactionConfig = compactionConfig;
+    }
+
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/632cbba8/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 4ac4ab5..9686789 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -25,7 +25,6 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -62,8 +61,6 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.User;
@@ -89,18 +86,12 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
 import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.collect.Lists;
@@ -847,48 +838,5 @@ public class Indexer extends BaseRegionObserver {
     properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName());
     desc.addCoprocessor(Indexer.class.getName(), null, priority, properties);
   }
-  
-  @Override
-  public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
-          final StoreFile resultFile, CompactionRequest request) throws IOException {
-      // If we're compacting all files, then delete markers are removed
-      // and we must permanently disable an index that needs to be
-      // partially rebuild because we're potentially losing the information
-      // we need to successfully rebuilt it.
-      if (request.isAllFiles() || request.isMajor()) {
-          // Compaction and split upcalls run with the effective user context of the requesting user.
-          // This will lead to failure of cross cluster RPC if the effective user is not
-          // the login user. Switch to the login user context to ensure we have the expected
-          // security context.
-          User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                  String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-                  try {
-                      PhoenixConnection conn =  QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class);
-                      PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
-                      // FIXME: we may need to recurse into children of this table too
-                      for (PTable index : table.getIndexes()) {
-                          if (index.getIndexDisableTimestamp() != 0) {
-                              try {
-                                  LOG.info("Major compaction running while index on table is disabled.  Clearing index disable timestamp: " + fullTableName);
-                                  IndexUtil.updateIndexState(conn, index.getName().getString(), PIndexState.DISABLE, Long.valueOf(0L));
-                              } catch (SQLException e) {
-                                  LOG.warn("Unable to permanently disable index " + index.getName().getString(), e);
-                              }
-                          }
-                      }
-                  } catch (Exception e) {
-                      // If we can't reach the stats table, don't interrupt the normal
-                      // compaction operation, just log a warning.
-                      if (LOG.isWarnEnabled()) {
-                          LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
-                      }
-                  }
-                  return null;
-              }
-          });
-      }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/632cbba8/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0f6bab2..0ce4246 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -602,6 +602,9 @@ public class MetaDataClient {
             }
 
             if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
+                if (result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS && result.getTable() == null) {
+                    result.setTable(table);
+                }
                 return result;
             }
             MutationCode code = result.getMutationCode();