You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/08/01 11:11:50 UTC
phoenix git commit: PHOENIX-3111 Possible Deadlock/delay while
building index, upsert select, delete rows at server(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/master 3251ac58a -> 7a27282f2
PHOENIX-3111 Possible Deadlock/delay while building index, upsert select, delete rows at server(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a27282f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a27282f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a27282f
Branch: refs/heads/master
Commit: 7a27282f237ff0fdcbcfc06cbd6062a764703b20
Parents: 3251ac5
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Aug 1 16:49:18 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Aug 1 16:49:18 2016 +0530
----------------------------------------------------------------------
.../UngroupedAggregateRegionObserver.java | 167 +++++++++++++++++--
1 file changed, 151 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a27282f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d783670..eda59d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -39,12 +39,17 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import javax.annotation.concurrent.GuardedBy;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -88,7 +93,6 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -104,7 +108,6 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -135,6 +138,37 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public static final String DELETE_CQ = "DeleteCQ";
public static final String DELETE_CF = "DeleteCF";
public static final String EMPTY_CF = "EmptyCF";
+ /**
+ * This lock used for synchronizing the state of
+ * {@link UngroupedAggregateRegionObserver#scansReferenceCount},
+ * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+ * dead lock situation in case below steps:
+ * 1. We get read lock when we start writing local indexes, deletes etc..
+ * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they
+ * happen without any problem until someone tries to obtain write lock.
+ * 3. at one moment we decide to split/bulkload/close and try to acquire write lock.
+ * 4. Since that moment all attempts to get read lock will be blocked. I.e. no more
+ * flushes will happen. But we continue to fill memstore with local index batches and
+ * finally we get RTBE.
+ *
+ * The solution to this is to not allow or delay operations acquire the write lock.
+ * 1) In case of split we just throw IOException so split won't happen but it will not cause any harm.
+ * 2) In case of bulkload failing it by throwing the exception.
+ * 3) In case of region close by balancer/move wait before closing the reason and fail the query which
+ * does write after reading.
+ *
+ * See PHOENIX-3111 for more info.
+ */
+
+ private final Object lock = new Object();
+ /**
+ * To maintain the number of scans used for create index, delete and upsert select operations
+ * which reads and writes to same region in coprocessors.
+ */
+ @GuardedBy("lock")
+ private int scansReferenceCount = 0;
+ @GuardedBy("lock")
+ private boolean isRegionClosing = false;
private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
private KeyValueBuilder kvBuilder;
@@ -146,7 +180,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
}
- private static void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID) throws IOException {
+ private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID,
+ long blockingMemstoreSize) throws IOException {
if (indexUUID != null) {
for (Mutation m : mutations) {
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
@@ -155,7 +190,40 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Mutation[] mutationArray = new Mutation[mutations.size()];
// TODO: should we use the one that is all or none?
logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
- region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ try {
+ region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ } catch (RegionTooBusyException rtbe) {
+ // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
+ // flush happen which decrease the memstore size and then writes allowed on the region.
+ for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+ try {
+ checkForRegionClosing();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+ if (region.getMemstoreSize() > blockingMemstoreSize) {
+ throw rtbe;
+ }
+ region.batchMutate(mutationArray, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+ }
+
+ /**
+ * There is a chance that region might be closing while running balancer/move/merge. In this
+ * case if the memstore size reaches blockingMemstoreSize better to fail query because there is
+ * a high chance that flush might not proceed and memstore won't be freed up.
+ * @throws IOException
+ */
+ private void checkForRegionClosing() throws IOException {
+ synchronized (lock) {
+ if(isRegionClosing) {
+ lock.notifyAll();
+ throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
+ }
+ }
}
public static void serializeIntoScan(Scan scan) {
@@ -276,8 +344,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
int batchSize = 0;
List<Mutation> mutations = Collections.emptyList();
+ boolean needToWrite = false;
+ Configuration conf = c.getEnvironment().getConfiguration();
+ long flushSize = region.getTableDesc().getMemStoreFlushSize();
+
+ if (flushSize <= 0) {
+ flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+ }
+
+ /**
+ * Upper bound of memstore size allowed for region. Updates will be blocked until the flush
+ * happen if the memstore reaches this threshold.
+ */
+ final long blockingMemStoreSize = flushSize * (
+ conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+ HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+
boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
+ needToWrite = true;
// TODO: size better
mutations = Lists.newArrayListWithExpectedSize(1024);
batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
@@ -295,6 +381,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
final RegionScanner innerScanner = theScanner;
boolean acquiredLock = false;
try {
+ if(needToWrite) {
+ synchronized (lock) {
+ scansReferenceCount++;
+ }
+ }
region.startRegionOperation();
acquiredLock = true;
synchronized (innerScanner) {
@@ -405,7 +496,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
ImmutableBytesPtr.copyBytesIfNecessary(ptr),
results);
Put put = maintainer.buildUpdateMutation(kvBuilder,
- valueGetter, ptr, ts,
+ valueGetter, ptr, results.get(0).getTimestamp(),
env.getRegion().getRegionInfo().getStartKey(),
env.getRegion().getRegionInfo().getEndKey());
indexMutations.add(put);
@@ -511,13 +602,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
if (!mutations.isEmpty() && batchSize > 0 &&
mutations.size() % batchSize == 0) {
- commitBatch(region, mutations, indexUUID);
+ commitBatch(region, mutations, indexUUID, blockingMemStoreSize);
mutations.clear();
}
// Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
if (!indexMutations.isEmpty() && batchSize > 0 &&
indexMutations.size() % batchSize == 0) {
- commitBatch(region, indexMutations, null);
+ commitBatch(region, indexMutations, null, blockingMemStoreSize);
indexMutations.clear();
}
} catch (ConstraintViolationException e) {
@@ -532,8 +623,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
hasAny = true;
}
} while (hasMore);
+ if (!mutations.isEmpty()) {
+ commitBatch(region,mutations, indexUUID, blockingMemStoreSize);
+ }
+
+ if (!indexMutations.isEmpty()) {
+ commitBatch(region,indexMutations, null, blockingMemStoreSize);
+ indexMutations.clear();
+ }
}
} finally {
+ if(needToWrite) {
+ synchronized (lock) {
+ scansReferenceCount--;
+ }
+ }
try {
innerScanner.close();
} finally {
@@ -544,15 +648,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan)));
}
- if (!mutations.isEmpty()) {
- commitBatch(region,mutations, indexUUID);
- }
-
- if (!indexMutations.isEmpty()) {
- commitBatch(region,indexMutations, null);
- indexMutations.clear();
- }
-
final boolean hadAny = hasAny;
KeyValue keyValue = null;
if (hadAny) {
@@ -819,6 +914,46 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
@Override
+ public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
+ throws IOException {
+ // Don't allow splitting if operations need read and write to same region are going on in the
+ // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
+ synchronized (lock) {
+ if (scansReferenceCount != 0) {
+ throw new IOException("Operations like local index building/delete/upsert select"
+ + " might be going on so not allowing to split.");
+ }
+ }
+ }
+
+ @Override
+ public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c,
+ List<Pair<byte[], String>> familyPaths) throws IOException {
+ // Don't allow bulkload if operations need read and write to same region are going on in the
+ // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
+ synchronized (lock) {
+ if (scansReferenceCount != 0) {
+ throw new DoNotRetryIOException("Operations like local index building/delete/upsert select"
+ + " might be going on so not allowing to bulkload.");
+ }
+ }
+ }
+
+ @Override
+ public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
+ throws IOException {
+ synchronized (lock) {
+ while (scansReferenceCount != 0) {
+ isRegionClosing = true;
+ try {
+ lock.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ @Override
protected boolean isRegionObserverFor(Scan scan) {
return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null;
}