You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/08/01 11:11:50 UTC

phoenix git commit: PHOENIX-3111 Possible Deadlock/delay while building index, upsert select, delete rows at server(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/master 3251ac58a -> 7a27282f2


PHOENIX-3111 Possible Deadlock/delay while building index, upsert select, delete rows at server(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a27282f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a27282f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a27282f

Branch: refs/heads/master
Commit: 7a27282f237ff0fdcbcfc06cbd6062a764703b20
Parents: 3251ac5
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Aug 1 16:49:18 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Aug 1 16:49:18 2016 +0530

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 167 +++++++++++++++++--
 1 file changed, 151 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a27282f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d783670..eda59d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -39,12 +39,17 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -88,7 +93,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -104,7 +108,6 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -135,6 +138,37 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public static final String DELETE_CQ = "DeleteCQ";
     public static final String DELETE_CF = "DeleteCF";
     public static final String EMPTY_CF = "EmptyCF";
+    /**
+     * This lock used for synchronizing the state of
+     * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
+     * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+     * dead lock situation in case below steps: 
+     * 1. We get read lock when we start writing local indexes, deletes etc.. 
+     * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they 
+     * happen without any problem until someone tries to obtain write lock. 
+     * 3. at one moment we decide to split/bulkload/close and try to acquire write lock. 
+     * 4. Since that moment all attempts to get read lock will be blocked. I.e. no more 
+     * flushes will happen. But we continue to fill memstore with local index batches and 
+     * finally we get RTBE.
+     * 
+     * The solution to this is to not allow or delay operations acquire the write lock.
+     * 1) In case of split we just throw IOException so split won't happen but it will not cause any harm.
+     * 2) In case of bulkload failing it by throwing the exception. 
+     * 3) In case of region close by balancer/move wait before closing the reason and fail the query which 
+     * does write after reading. 
+     * 
+     * See PHOENIX-3111 for more info.
+     */
+
+    private final Object lock = new Object();
+    /**
+     * To maintain the number of scans used for create index, delete and upsert select operations
+     * which reads and writes to same region in coprocessors.
+     */
+    @GuardedBy("lock")
+    private int scansReferenceCount = 0;
+    @GuardedBy("lock")
+    private boolean isRegionClosing = false;
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
 
@@ -146,7 +180,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
     }
 
-    private static void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID) throws IOException {
+    private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID,
+            long blockingMemstoreSize) throws IOException {
       if (indexUUID != null) {
           for (Mutation m : mutations) {
               m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
@@ -155,7 +190,40 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       Mutation[] mutationArray = new Mutation[mutations.size()];
       // TODO: should we use the one that is all or none?
       logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
-      region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
+      try {
+          region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
+      } catch (RegionTooBusyException rtbe) {
+            // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
+            // flush happen which decrease the memstore size and then writes allowed on the region.
+            for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+                try {
+                    checkForRegionClosing();
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(e);
+                }
+            }
+            if (region.getMemstoreSize() > blockingMemstoreSize) {
+                throw rtbe;
+            }
+            region.batchMutate(mutationArray, HConstants.NO_NONCE, HConstants.NO_NONCE);
+      }
+    }
+
+    /**
+     * There is a chance that region might be closing while running balancer/move/merge. In this
+     * case if the memstore size reaches blockingMemstoreSize better to fail query because there is
+     * a high chance that flush might not proceed and memstore won't be freed up.
+     * @throws IOException
+     */
+    private void checkForRegionClosing() throws IOException {
+        synchronized (lock) {
+            if(isRegionClosing) {
+                lock.notifyAll();
+                throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
+            }
+        }
     }
 
     public static void serializeIntoScan(Scan scan) {
@@ -276,8 +344,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         
         int batchSize = 0;
         List<Mutation> mutations = Collections.emptyList();
+        boolean needToWrite = false;
+        Configuration conf = c.getEnvironment().getConfiguration();
+        long flushSize = region.getTableDesc().getMemStoreFlushSize();
+
+        if (flushSize <= 0) {
+            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+                    HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+        }
+
+        /**
+         * Upper bound of memstore size allowed for region. Updates will be blocked until the flush
+         * happen if the memstore reaches this threshold.
+         */
+        final long blockingMemStoreSize = flushSize * (
+                conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+                        HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+
         boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
         if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
+            needToWrite = true;
             // TODO: size better
             mutations = Lists.newArrayListWithExpectedSize(1024);
             batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
@@ -295,6 +381,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         final RegionScanner innerScanner = theScanner;
         boolean acquiredLock = false;
         try {
+            if(needToWrite) {
+                synchronized (lock) {
+                    scansReferenceCount++;
+                }
+            }
             region.startRegionOperation();
             acquiredLock = true;
             synchronized (innerScanner) {
@@ -405,7 +496,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                                     ImmutableBytesPtr.copyBytesIfNecessary(ptr),
                                                     results);
                                         Put put = maintainer.buildUpdateMutation(kvBuilder,
-                                            valueGetter, ptr, ts,
+                                            valueGetter, ptr, results.get(0).getTimestamp(),
                                             env.getRegion().getRegionInfo().getStartKey(),
                                             env.getRegion().getRegionInfo().getEndKey());
                                         indexMutations.add(put);
@@ -511,13 +602,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
                             if (!mutations.isEmpty() && batchSize > 0 &&
                                     mutations.size() % batchSize == 0) {
-                                commitBatch(region, mutations, indexUUID);
+                                commitBatch(region, mutations, indexUUID, blockingMemStoreSize);
                                 mutations.clear();
                             }
                             // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
                             if (!indexMutations.isEmpty() && batchSize > 0 &&
                                     indexMutations.size() % batchSize == 0) {
-                                commitBatch(region, indexMutations, null);
+                                commitBatch(region, indexMutations, null, blockingMemStoreSize);
                                 indexMutations.clear();
                             }
                         } catch (ConstraintViolationException e) {
@@ -532,8 +623,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                         hasAny = true;
                     }
                 } while (hasMore);
+                if (!mutations.isEmpty()) {
+                    commitBatch(region,mutations, indexUUID, blockingMemStoreSize);
+                }
+
+                if (!indexMutations.isEmpty()) {
+                    commitBatch(region,indexMutations, null, blockingMemStoreSize);
+                    indexMutations.clear();
+                }
             }
         } finally {
+            if(needToWrite) {
+                synchronized (lock) {
+                    scansReferenceCount--;
+                }
+            }
             try {
                 innerScanner.close();
             } finally {
@@ -544,15 +648,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan)));
         }
 
-        if (!mutations.isEmpty()) {
-            commitBatch(region,mutations, indexUUID);
-        }
-
-        if (!indexMutations.isEmpty()) {
-            commitBatch(region,indexMutations, null);
-            indexMutations.clear();
-        }
-
         final boolean hadAny = hasAny;
         KeyValue keyValue = null;
         if (hadAny) {
@@ -819,6 +914,46 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     }
 
     @Override
+    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
+            throws IOException {
+        // Don't allow splitting if operations need read and write to same region are going on in the
+        // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
+        synchronized (lock) {
+            if (scansReferenceCount != 0) {
+                throw new IOException("Operations like local index building/delete/upsert select"
+                        + " might be going on so not allowing to split.");
+            }
+        }
+    }
+
+    @Override
+    public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c,
+            List<Pair<byte[], String>> familyPaths) throws IOException {
+        // Don't allow bulkload if operations need read and write to same region are going on in the
+        // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
+        synchronized (lock) {
+            if (scansReferenceCount != 0) {
+                throw new DoNotRetryIOException("Operations like local index building/delete/upsert select"
+                        + " might be going on so not allowing to bulkload.");
+            }
+        }
+    }
+
+    @Override
+    public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
+            throws IOException {
+        synchronized (lock) {
+            while (scansReferenceCount != 0) {
+                isRegionClosing = true;
+                try {
+                    lock.wait(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+    }
+
+    @Override
     protected boolean isRegionObserverFor(Scan scan) {
         return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null;
     }