You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/13 00:05:01 UTC

[3/4] phoenix git commit: PHOENIX-4169 Explicitly cap timeout for index disable RPC on compaction (Vincent Poon)

PHOENIX-4169 Explicitly cap timeout for index disable RPC on compaction (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2e5986a7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2e5986a7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2e5986a7

Branch: refs/heads/master
Commit: 2e5986a76aa171a62b342a46cc984a00a3a36746
Parents: 33b12c7
Author: James Taylor <ja...@apache.org>
Authored: Tue Sep 12 17:00:47 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 12 17:04:44 2017 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 29 ++++++++++++++++----
 .../org/apache/phoenix/hbase/index/Indexer.java | 14 +++++++++-
 .../org/apache/phoenix/query/QueryServices.java |  4 +++
 .../phoenix/query/QueryServicesOptions.java     |  5 ++++
 4 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e5986a7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a61f502..0773ebc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CoprocessorHConnection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -67,6 +68,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -98,6 +100,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
@@ -192,6 +195,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
+    private Configuration compactionConfig;
 
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -212,6 +216,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
          */
         upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
             InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+
+        compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
+        // lower the number of rpc retries, so we don't hang the compaction
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+            e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
     }
 
     private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
@@ -924,11 +937,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 public Void run() throws Exception {
                     MutationCode mutationCode = null;
                     long disableIndexTimestamp = 0;
-                    
-                    try (HTableInterface htable = e.getEnvironment().getTable(
-                                SchemaUtil.getPhysicalTableName(
-                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
-                                        e.getEnvironment().getConfiguration()))) {
+
+                    try (CoprocessorHConnection coprocessorHConnection =
+                            new CoprocessorHConnection(compactionConfig,
+                                    (HRegionServer) e.getEnvironment()
+                                            .getRegionServerServices());
+                            HTableInterface htable =
+                                    coprocessorHConnection
+                                            .getTable(SchemaUtil.getPhysicalTableName(
+                                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                                compactionConfig))) {
                         String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                         // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
                         // Instead, we need to disable all indexes on the view.
@@ -941,6 +959,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             if (cell.getValueLength() > 0) {
                                 disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
                                 if (disableIndexTimestamp != 0) {
+                                    logger.info("Major compaction running while index on table is disabled.  Clearing index disable timestamp: " + tableName);
                                     mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e5986a7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index ad03abb..e9d735d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
 import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
@@ -194,6 +195,7 @@ public class Indexer extends BaseRegionObserver {
   private long slowPostOpenThreshold;
   private long slowPreIncrementThreshold;
   private int rowLockWaitDuration;
+  private Configuration compactionConfig;
   
   public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
 
@@ -249,6 +251,15 @@ public class Indexer extends BaseRegionObserver {
         this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
         setSlowThresholds(e.getConfiguration());
 
+        compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
+        // lower the number of rpc retries, so we don't hang the compaction
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+            e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+
         try {
           // get the specified failure policy. We only ever override it in tests, but we need to do it
           // here
@@ -856,12 +867,13 @@ public class Indexer extends BaseRegionObserver {
               public Void run() throws Exception {
                   String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                   try {
-                      PhoenixConnection conn =  QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class);
+                      PhoenixConnection conn =  QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class);
                       PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
                       // FIXME: we may need to recurse into children of this table too
                       for (PTable index : table.getIndexes()) {
                           if (index.getIndexDisableTimestamp() != 0) {
                               try {
+                                  LOG.info("Major compaction running while index on table is disabled.  Clearing index disable timestamp: " + fullTableName);
                                   IndexUtil.updateIndexState(conn, index.getName().getString(), PIndexState.DISABLE, Long.valueOf(0L));
                               } catch (SQLException e) {
                                   LOG.warn("Unable to permanently disable index " + index.getName().getString(), e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e5986a7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index a1d9761..70d9878 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -162,6 +162,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
     public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
 
+    // Retries when doing server side writes to SYSTEM.CATALOG
+    public static final String METADATA_WRITE_RETRIES_NUMBER = "phoenix.metadata.rpc.retries.number";
+    public static final String METADATA_WRITE_RETRY_PAUSE = "phoenix.metadata.rpc.pause";
+
     // Config parameters for for configuring tracing
     public static final String TRACING_FREQ_ATTRIB = "phoenix.trace.frequency";
     public static final String TRACING_PAGE_SIZE_ATTRIB = "phoenix.trace.read.pagesize";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e5986a7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6ff096f..4e0d4cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -205,6 +205,11 @@ public class QueryServicesOptions {
     public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
     public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
 
+    // Retries when doing server side writes to SYSTEM.CATALOG
+    // 20 retries with 100 pause = 230 seconds total retry time
+    public static final int DEFAULT_METADATA_WRITE_RETRIES_NUMBER = 20;
+    public static final int DEFAULT_METADATA_WRITE_RETRY_PAUSE = 100;
+
     public static final int DEFAULT_TRACING_PAGE_SIZE = 100;
     /**
      * Configuration key to overwrite the tablename that should be used as the target table