You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/03/16 21:11:59 UTC
[37/50] [abbrv] phoenix git commit: PHOENIX-3706 RenewLeaseTask
should give up and reattempt later to renewlease if lock cannot be acquired
PHOENIX-3706 RenewLeaseTask should give up and reattempt later to renewlease if lock cannot be acquired
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/336a82d4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/336a82d4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/336a82d4
Branch: refs/heads/calcite
Commit: 336a82d410057d10c16d0bfef6aebd94c63026f5
Parents: 023f863
Author: Samarth <sa...@salesforce.com>
Authored: Mon Mar 6 13:29:11 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Mar 6 13:29:11 2017 -0800
----------------------------------------------------------------------
.../iterate/RenewLeaseOnlyTableIterator.java | 17 +-
.../phoenix/iterate/TableResultIterator.java | 186 +++++++++++--------
.../query/ConnectionQueryServicesImpl.java | 6 +-
.../phoenix/query/ScannerLeaseRenewalTest.java | 21 +--
4 files changed, 137 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
index 5fa4126..e123fa3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.iterate;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
-import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
@@ -29,16 +29,18 @@ public class RenewLeaseOnlyTableIterator extends TableResultIterator {
private final int numberOfLeaseRenewals;
private final int thresholdNotReachedAt;
- private final int doNotRenewLeaseAt;
+ private final int failToAcquireLockAt;
+ private final int failLeaseRenewalAt;
private int counter = 0;
private RenewLeaseStatus lastRenewLeaseStatus;
- public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int doNotRenewLeaseAt) throws SQLException {
+ public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int failToAcquireLockAt, int doNotRenewLeaseAt) throws SQLException {
super();
checkArgument(renewLeaseCount >= skipRenewLeaseAt);
this.numberOfLeaseRenewals = renewLeaseCount;
this.thresholdNotReachedAt = skipRenewLeaseAt;
- this.doNotRenewLeaseAt = doNotRenewLeaseAt;
+ this.failToAcquireLockAt = failToAcquireLockAt;
+ this.failLeaseRenewalAt = doNotRenewLeaseAt;
}
@Override
@@ -46,8 +48,11 @@ public class RenewLeaseOnlyTableIterator extends TableResultIterator {
counter++;
if (counter == thresholdNotReachedAt) {
lastRenewLeaseStatus = THRESHOLD_NOT_REACHED;
- } else if (counter == doNotRenewLeaseAt) {
- lastRenewLeaseStatus = NOT_RENEWED;
+ } else if (counter == failLeaseRenewalAt) {
+ lastRenewLeaseStatus = null;
+ throw new RuntimeException("Failing lease renewal");
+ } else if (counter == failToAcquireLockAt) {
+ lastRenewLeaseStatus = LOCK_NOT_ACQUIRED;
} else if (counter <= numberOfLeaseRenewals) {
lastRenewLeaseStatus = RENEWED;
} else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index b1e2615..c6fcc1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -20,7 +20,9 @@ package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_SUPPORTED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
@@ -28,6 +30,8 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
@@ -47,7 +51,6 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
/**
@@ -69,15 +72,17 @@ public class TableResultIterator implements ResultIterator {
private Tuple lastTuple = null;
private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- @GuardedBy("this")
+ @GuardedBy("renewLeaseLock")
private ResultIterator scanIterator;
- @GuardedBy("this")
+ @GuardedBy("renewLeaseLock")
private boolean closed = false;
- @GuardedBy("this")
+ @GuardedBy("renewLeaseLock")
private long renewLeaseTime = 0;
+ private final Lock renewLeaseLock = new ReentrantLock();
+
@VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
TableResultIterator() {
this.scanMetrics = null;
@@ -89,7 +94,7 @@ public class TableResultIterator implements ResultIterator {
}
public static enum RenewLeaseStatus {
- RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
+ RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED
};
public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics,
@@ -105,74 +110,90 @@ public class TableResultIterator implements ResultIterator {
}
@Override
- public synchronized void close() throws SQLException {
- closed = true; // ok to say closed even if the below code throws an exception
+ public void close() throws SQLException {
try {
- scanIterator.close();
- } finally {
+ renewLeaseLock.lock();
+ closed = true; // ok to say closed even if the below code throws an exception
try {
- scanIterator = UNINITIALIZED_SCANNER;
- htable.close();
- } catch (IOException e) {
- throw ServerUtil.parseServerException(e);
+ scanIterator.close();
+ } finally {
+ try {
+ scanIterator = UNINITIALIZED_SCANNER;
+ htable.close();
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ }
}
+ } finally {
+ renewLeaseLock.unlock();
}
+
}
@Override
- public synchronized Tuple next() throws SQLException {
- initScanner();
+ public Tuple next() throws SQLException {
try {
- lastTuple = scanIterator.next();
- if (lastTuple != null) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- lastTuple.getKey(ptr);
- }
- } catch (SQLException e) {
+ renewLeaseLock.lock();
+ initScanner();
try {
- throw ServerUtil.parseServerException(e);
- } catch(StaleRegionBoundaryCacheException e1) {
- if(ScanUtil.isNonAggregateScan(scan)) {
- // For non aggregate queries if we get stale region boundary exception we can
- // continue scanning from the next value of lasted fetched result.
- Scan newScan = ScanUtil.newScan(scan);
- newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
- if(lastTuple != null) {
- lastTuple.getKey(ptr);
- byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr);
- if(ScanUtil.isLocalIndex(newScan)) {
- // If we just set scan start row suffix then server side we prepare
- // actual scan boundaries by prefixing the region start key.
- newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix));
- } else {
- newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+ lastTuple = scanIterator.next();
+ if (lastTuple != null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ lastTuple.getKey(ptr);
+ }
+ } catch (SQLException e) {
+ try {
+ throw ServerUtil.parseServerException(e);
+ } catch(StaleRegionBoundaryCacheException e1) {
+ if(ScanUtil.isNonAggregateScan(scan)) {
+ // For non aggregate queries if we get stale region boundary exception we can
+ // continue scanning from the next value of lasted fetched result.
+ Scan newScan = ScanUtil.newScan(scan);
+ newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
+ if(lastTuple != null) {
+ lastTuple.getKey(ptr);
+ byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ if(ScanUtil.isLocalIndex(newScan)) {
+ // If we just set scan start row suffix then server side we prepare
+ // actual scan boundaries by prefixing the region start key.
+ newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix));
+ } else {
+ newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+ }
}
+ plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
+ this.scanIterator =
+ plan.iterator(scanGrouper, newScan);
+ lastTuple = scanIterator.next();
+ } else {
+ throw e;
}
- plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
- this.scanIterator =
- plan.iterator(scanGrouper, newScan);
- lastTuple = scanIterator.next();
- } else {
- throw e;
}
}
+ return lastTuple;
+ } finally {
+ renewLeaseLock.unlock();
}
- return lastTuple;
}
- public synchronized void initScanner() throws SQLException {
- if (closed) {
- return;
- }
- ResultIterator delegate = this.scanIterator;
- if (delegate == UNINITIALIZED_SCANNER) {
- try {
- this.scanIterator =
- new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
- } catch (IOException e) {
- Closeables.closeQuietly(htable);
- throw ServerUtil.parseServerException(e);
+ public void initScanner() throws SQLException {
+ try {
+ renewLeaseLock.lock();
+ if (closed) {
+ return;
+ }
+ ResultIterator delegate = this.scanIterator;
+ if (delegate == UNINITIALIZED_SCANNER) {
+ try {
+ this.scanIterator =
+ new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
+ } catch (IOException e) {
+ Closeables.closeQuietly(htable);
+ throw ServerUtil.parseServerException(e);
+ }
}
+ } finally {
+ renewLeaseLock.unlock();
}
}
@@ -181,27 +202,42 @@ public class TableResultIterator implements ResultIterator {
return "TableResultIterator [htable=" + htable + ", scan=" + scan + "]";
}
- public synchronized RenewLeaseStatus renewLease() {
- if (closed) {
- return CLOSED;
- }
- if (scanIterator == UNINITIALIZED_SCANNER) {
- return UNINITIALIZED;
- }
- long delay = now() - renewLeaseTime;
- if (delay < renewLeaseThreshold) {
- return THRESHOLD_NOT_REACHED;
- }
- if (scanIterator instanceof ScanningResultIterator
- && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) {
- // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed.
- boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease();
- if (leaseRenewed) {
- renewLeaseTime = now();
- return RENEWED;
+ public RenewLeaseStatus renewLease() {
+ boolean lockAcquired = false;
+ try {
+ lockAcquired = renewLeaseLock.tryLock();
+ if (lockAcquired) {
+ if (closed) {
+ return CLOSED;
+ }
+ if (scanIterator == UNINITIALIZED_SCANNER) {
+ return UNINITIALIZED;
+ }
+ long delay = now() - renewLeaseTime;
+ if (delay < renewLeaseThreshold) {
+ return THRESHOLD_NOT_REACHED;
+ }
+ if (scanIterator instanceof ScanningResultIterator
+ && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) {
+ // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed.
+ boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease();
+ if (leaseRenewed) {
+ renewLeaseTime = now();
+ return RENEWED;
+ } else {
+ return NOT_RENEWED;
+ }
+ } else {
+ return NOT_SUPPORTED;
+ }
+ }
+ return LOCK_NOT_ACQUIRED;
+ }
+ finally {
+ if (lockAcquired) {
+ renewLeaseLock.unlock();
}
}
- return NOT_RENEWED;
}
private static long now() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 03a5e13..8ba2c81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4077,8 +4077,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
scanningItr));
logger.info("Lease renewed for scanner: " + scanningItr);
break;
+ // Scanner not initialized probably because next() hasn't been called on it yet. Enqueue it back to attempt lease renewal later.
case UNINITIALIZED:
+ // Threshold not yet reached. Re-enqueue to renew lease later.
case THRESHOLD_NOT_REACHED:
+ // Another scanner operation in progress. Re-enqueue to attempt renewing lease later.
+ case LOCK_NOT_ACQUIRED:
// add it back at the tail
scannerQueue.offer(new WeakReference<TableResultIterator>(
scanningItr));
@@ -4086,7 +4090,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// if lease wasn't renewed or scanner was closed, don't add the
// scanner back to the queue.
case CLOSED:
- case NOT_RENEWED:
+ case NOT_SUPPORTED:
break;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
index 7d8904d..2969fdc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.query;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
-import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -48,7 +48,8 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
int skipRenewLeaseCount = 2;
- RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, -1);
+ int failToAcquireLockAt = 3;
+ RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, failToAcquireLockAt, -1);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
@@ -69,7 +70,7 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1);
- assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed
+ assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus()); // lock couldn't be acquired
task.run();
assertTrue(scannerQueue.size() == 1);
@@ -96,9 +97,10 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
+ int lockNotAcquiredAt = 1;
int thresholdNotReachedCount = 2;
- int leaseNotRenewedCount = 3;
- RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, leaseNotRenewedCount);
+ int failLeaseRenewalAt = 3;
+ RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
@@ -108,8 +110,8 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
task.run();
assertTrue(connectionsQueue.size() == 1);
- assertTrue(scannerQueue.size() == 1); // lease renewed
- assertEquals(RENEWED, itr.getLastRenewLeaseStatus());
+ assertTrue(scannerQueue.size() == 1); // lock not acquired
+ assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
@@ -118,10 +120,7 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
task.run();
assertTrue(scannerQueue.size() == 0);
- assertTrue(connectionsQueue.size() == 1);
- // Lease not renewed due to error or some other reason.
- // In this case we don't call renew lease on the scanner anymore.
- assertEquals(NOT_RENEWED, itr.getLastRenewLeaseStatus());
+ assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error
pconn.close();
task.run();