You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/07 18:32:22 UTC

phoenix git commit: PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum 2)

Repository: phoenix
Updated Branches:
  refs/heads/master 814276d4b -> 64b808971


PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum 2)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/64b80897
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/64b80897
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/64b80897

Branch: refs/heads/master
Commit: 64b808971698880980d06f17b0924e6e22d95e12
Parents: 814276d
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Sep 7 11:26:47 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:26:47 2017 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 62 ++++++++++++++-
 .../org/apache/phoenix/hbase/index/Indexer.java | 42 +++++-----
 .../stats/DefaultStatisticsCollector.java       | 83 ++++++--------------
 .../schema/stats/NoOpStatisticsCollector.java   |  2 +-
 .../schema/stats/StatisticsCollector.java       |  2 +-
 5 files changed, 111 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 31c83e4..a61f502 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -55,9 +55,12 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -69,11 +72,14 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.execute.TupleProjector;
@@ -89,11 +95,13 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
@@ -899,6 +907,58 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         });
     }
 
+    @Override
+    public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+            final StoreFile resultFile, CompactionRequest request) throws IOException {
+        // If we're compacting all files, then delete markers are removed
+        // and we must permanently disable an index that needs to be
+        // partially rebuild because we're potentially losing the information
+        // we need to successfully rebuilt it.
+        if (request.isAllFiles() || request.isMajor()) {
+            // Compaction and split upcalls run with the effective user context of the requesting user.
+            // This will lead to failure of cross cluster RPC if the effective user is not
+            // the login user. Switch to the login user context to ensure we have the expected
+            // security context.
+            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    MutationCode mutationCode = null;
+                    long disableIndexTimestamp = 0;
+                    
+                    try (HTableInterface htable = e.getEnvironment().getTable(
+                                SchemaUtil.getPhysicalTableName(
+                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                        e.getEnvironment().getConfiguration()))) {
+                        String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+                        // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
+                        // Instead, we need to disable all indexes on the view.
+                        byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName);
+                        Get get = new Get(tableKey);
+                        get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                        Result result = htable.get(get);
+                        if (!result.isEmpty()) {
+                            Cell cell = result.listCells().get(0);
+                            if (cell.getValueLength() > 0) {
+                                disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
+                                if (disableIndexTimestamp != 0) {
+                                    mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
+                                }
+                            }
+                        }
+                    } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction
+                        logger.warn("Potential failure to permanently disable index during compaction " +  e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t);
+                    } finally {
+                        if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
+                            logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() + 
+                                    " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode));
+                        }
+                    }
+                    return null;
+                }
+            });
+        }
+    }
+
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -1115,7 +1175,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             long rowCount = 0;
             try {
                 if (!compactionRunning) {
-                    stats.init(false);
+                    stats.init();
                     synchronized (innerScanner) {
                         do {
                             List<Cell> results = new ArrayList<Cell>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index b76752d..ad03abb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.User;
@@ -838,21 +840,25 @@ public class Indexer extends BaseRegionObserver {
   }
   
   @Override
-  public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
-          final InternalScanner scanner, final ScanType scanType) throws IOException {
-      // Compaction and split upcalls run with the effective user context of the requesting user.
-      // This will lead to failure of cross cluster RPC if the effective user is not
-      // the login user. Switch to the login user context to ensure we have the expected
-      // security context.
-      return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
-          @Override
-          public InternalScanner run() throws Exception {
-              InternalScanner internalScanner = scanner;
-              if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+  public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+          final StoreFile resultFile, CompactionRequest request) throws IOException {
+      // If we're compacting all files, then delete markers are removed
+      // and we must permanently disable an index that needs to be
+      // partially rebuild because we're potentially losing the information
+      // we need to successfully rebuilt it.
+      if (request.isAllFiles() || request.isMajor()) {
+          // Compaction and split upcalls run with the effective user context of the requesting user.
+          // This will lead to failure of cross cluster RPC if the effective user is not
+          // the login user. Switch to the login user context to ensure we have the expected
+          // security context.
+          User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
                   String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                   try {
                       PhoenixConnection conn =  QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class);
                       PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+                      // FIXME: we may need to recurse into children of this table too
                       for (PTable index : table.getIndexes()) {
                           if (index.getIndexDisableTimestamp() != 0) {
                               try {
@@ -864,15 +870,15 @@ public class Indexer extends BaseRegionObserver {
                       }
                   } catch (Exception e) {
                       // If we can't reach the stats table, don't interrupt the normal
-                    // compaction operation, just log a warning.
-                    if (LOG.isWarnEnabled()) {
-                        LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
-                    }
+                      // compaction operation, just log a warning.
+                      if (LOG.isWarnEnabled()) {
+                          LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
+                      }
                   }
+                  return null;
               }
-              return internalScanner;
-          }
-      });
+          });
+      }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 61a6fa2..b8ba759 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -42,16 +41,13 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TimeKeeper;
@@ -113,10 +109,9 @@ class DefaultStatisticsCollector implements StatisticsCollector {
         }
     }
     
-    private void initGuidepostDepth(boolean isMajorCompaction) throws IOException {
+    private void initGuidepostDepth() throws IOException {
         // First check is if guidepost info set on statement itself
-        boolean guidepostOnStatement = guidePostPerRegionBytes != null || guidePostWidthBytes != null;
-        if (guidepostOnStatement) {
+        if (guidePostPerRegionBytes != null || guidePostWidthBytes != null) {
             int guidepostPerRegion = 0;
             long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES;
             if (guidePostPerRegionBytes != null) {
@@ -127,48 +122,20 @@ class DefaultStatisticsCollector implements StatisticsCollector {
             }
             this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
                     env.getRegion().getTableDesc());
-        }
-        
-        if (!guidepostOnStatement || isMajorCompaction) {
+        } else {
             long guidepostWidth = -1;
             HTableInterface htable = null;
             try {
-                // Next check for GUIDE_POST_WIDTH and INDEX_DISABLE_TIMESTAMP on table
-                TableName htableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration());
-                htable = env.getTable(htableName);
+                // Next check for GUIDE_POST_WIDTH on table
+                htable = env.getTable(
+                        SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
                 Get get = new Get(ptableKey);
                 get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
-                get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
                 Result result = htable.get(get);
                 if (!result.isEmpty()) {
-                    Cell gpwCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
-                    if (gpwCell != null) {
-                        guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(gpwCell.getValueArray(), gpwCell.getValueOffset(), SortOrder.getDefault());
-                    }
-                    if (isMajorCompaction) {
-                        Cell idtsCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
-                        if (idtsCell != null) {
-                            long indexDisableTimestamp = PLong.INSTANCE.getCodec().decodeLong(idtsCell.getValueArray(), idtsCell.getValueOffset(), SortOrder.getDefault());
-                            // If we have a non zero value for INDEX_DISABLE_TIMESTAMP, that means that our global mutable
-                            // secondary index needs to be partially rebuilt. If we're  compacting, though, we may cleanup
-                            // the delete markers of an index *before* the puts for the same row occur during replay. At
-                            // this point the partially index rebuild would leave the index out of sync with the data
-                            // table. In that case, it's better to just permanently disable the index and force it to be
-                            // manually rebuilt
-                            if (indexDisableTimestamp != 0) {
-                                MutationCode mutationCode = IndexUtil.updateIndexState(ptableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
-                                if (mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
-                                    LOG.warn("Attempt to permanently disable index " + env.getRegionInfo().getTable().getNameAsString() + 
-                                            " during compaction failed with code = " + mutationCode);
-                                }
-                            }
-                        }
-                    }
+                    Cell cell = result.listCells().get(0);
+                    guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
                 }
-            } catch (IOException e) {
-                throw e;
-            } catch (Throwable t) {
-                throw new IOException(t);
             } finally {
                 if (htable != null) {
                     try {
@@ -178,21 +145,19 @@ class DefaultStatisticsCollector implements StatisticsCollector {
                     }
                 }
             }
-            if (!guidepostOnStatement) {
-                if (guidepostWidth >= 0) {
-                    this.guidePostDepth = guidepostWidth;
-                } else {
-                    // Last use global config value
-                    Configuration config = env.getConfiguration();
-                    this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
-                            config.getInt(
-                                QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
-                            config.getLong(
-                                QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
-                            env.getRegion().getTableDesc());
-                }
+            if (guidepostWidth >= 0) {
+                this.guidePostDepth = guidepostWidth;
+            } else {
+                // Last use global config value
+                Configuration config = env.getConfiguration();
+                this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
+                        config.getInt(
+                            QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                            QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
+                        config.getLong(
+                            QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                            QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
+                        env.getRegion().getTableDesc());
             }
         }
     }
@@ -350,13 +315,13 @@ class DefaultStatisticsCollector implements StatisticsCollector {
         StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, internalScan, family);
         // We need to initialize the scanner synchronously and potentially perform a cross region Get
         // in order to use the correct guide posts width for the table being compacted.
-        init(true);
+        init();
         return scanner;
     }
 
     @Override
-    public void init(boolean isMajorCompaction) throws IOException {
-        initGuidepostDepth(isMajorCompaction);
+    public void init() throws IOException {
+        initGuidepostDepth();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
index a13a722..74d1710 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -61,7 +61,7 @@ public class NoOpStatisticsCollector implements StatisticsCollector {
     }
 
     @Override 
-    public void init(boolean isMajorCompaction) {
+    public void init() {
         // No-op
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/64b80897/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 9550469..60e83a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -64,7 +64,7 @@ public interface StatisticsCollector extends Closeable {
      * Called before beginning the collection of statistics through {@link #collectStatistics(List)}
      * @throws IOException 
      */
-    void init(boolean isMajorCompaction) throws IOException;
+    void init() throws IOException;
 
     /**
      * Retrieve the calculated guide post info for the given column family.