You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/03/09 23:02:55 UTC

[42/50] [abbrv] phoenix git commit: PHOENIX-3706 RenewLeaseTask should give up and reattempt later to renewlease if lock cannot be acquired

PHOENIX-3706 RenewLeaseTask should give up and reattempt later to renewlease if lock cannot be acquired


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/336a82d4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/336a82d4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/336a82d4

Branch: refs/heads/omid
Commit: 336a82d410057d10c16d0bfef6aebd94c63026f5
Parents: 023f863
Author: Samarth <sa...@salesforce.com>
Authored: Mon Mar 6 13:29:11 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Mar 6 13:29:11 2017 -0800

----------------------------------------------------------------------
 .../iterate/RenewLeaseOnlyTableIterator.java    |  17 +-
 .../phoenix/iterate/TableResultIterator.java    | 186 +++++++++++--------
 .../query/ConnectionQueryServicesImpl.java      |   6 +-
 .../phoenix/query/ScannerLeaseRenewalTest.java  |  21 +--
 4 files changed, 137 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
index 5fa4126..e123fa3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.iterate;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
-import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 
@@ -29,16 +29,18 @@ public class RenewLeaseOnlyTableIterator extends TableResultIterator {
 
     private final int numberOfLeaseRenewals;
     private final int thresholdNotReachedAt;
-    private final int doNotRenewLeaseAt;
+    private final int failToAcquireLockAt;
+    private final int failLeaseRenewalAt;
     private int counter = 0;
     private RenewLeaseStatus lastRenewLeaseStatus;
 
-    public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int doNotRenewLeaseAt) throws SQLException {
+    public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int failToAcquireLockAt, int doNotRenewLeaseAt) throws SQLException {
         super();
         checkArgument(renewLeaseCount >= skipRenewLeaseAt);
         this.numberOfLeaseRenewals = renewLeaseCount;
         this.thresholdNotReachedAt = skipRenewLeaseAt;
-        this.doNotRenewLeaseAt = doNotRenewLeaseAt;
+        this.failToAcquireLockAt = failToAcquireLockAt;
+        this.failLeaseRenewalAt = doNotRenewLeaseAt;
     }
 
     @Override
@@ -46,8 +48,11 @@ public class RenewLeaseOnlyTableIterator extends TableResultIterator {
         counter++;
         if (counter == thresholdNotReachedAt) {
             lastRenewLeaseStatus = THRESHOLD_NOT_REACHED;
-        } else if (counter == doNotRenewLeaseAt) {
-            lastRenewLeaseStatus = NOT_RENEWED;
+        } else if (counter == failLeaseRenewalAt) {
+            lastRenewLeaseStatus = null;
+            throw new RuntimeException("Failing lease renewal");
+        } else if (counter == failToAcquireLockAt) {
+            lastRenewLeaseStatus = LOCK_NOT_ACQUIRED;
         } else if (counter <= numberOfLeaseRenewals) {
             lastRenewLeaseStatus = RENEWED;
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index b1e2615..c6fcc1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -20,7 +20,9 @@ package org.apache.phoenix.iterate;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_SUPPORTED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
@@ -28,6 +30,8 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -47,7 +51,6 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
 
 
 /**
@@ -69,15 +72,17 @@ public class TableResultIterator implements ResultIterator {
     private Tuple lastTuple = null;
     private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
-    @GuardedBy("this")
+    @GuardedBy("renewLeaseLock")
     private ResultIterator scanIterator;
 
-    @GuardedBy("this")
+    @GuardedBy("renewLeaseLock")
     private boolean closed = false;
 
-    @GuardedBy("this")
+    @GuardedBy("renewLeaseLock")
     private long renewLeaseTime = 0;
     
+    private final Lock renewLeaseLock = new ReentrantLock();
+
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
         this.scanMetrics = null;
@@ -89,7 +94,7 @@ public class TableResultIterator implements ResultIterator {
     }
 
     public static enum RenewLeaseStatus {
-        RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
+        RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED
     };
 
     public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics,
@@ -105,74 +110,90 @@ public class TableResultIterator implements ResultIterator {
     }
 
     @Override
-    public synchronized void close() throws SQLException {
-        closed = true; // ok to say closed even if the below code throws an exception
+    public void close() throws SQLException {
         try {
-            scanIterator.close();
-        } finally {
+            renewLeaseLock.lock();
+            closed = true; // ok to say closed even if the below code throws an exception
             try {
-                scanIterator = UNINITIALIZED_SCANNER;
-                htable.close();
-            } catch (IOException e) {
-                throw ServerUtil.parseServerException(e);
+                scanIterator.close();
+            } finally {
+                try {
+                    scanIterator = UNINITIALIZED_SCANNER;
+                    htable.close();
+                } catch (IOException e) {
+                    throw ServerUtil.parseServerException(e);
+                }
             }
+        } finally {
+            renewLeaseLock.unlock();
         }
+
     }
     
     @Override
-    public synchronized Tuple next() throws SQLException {
-        initScanner();
+    public Tuple next() throws SQLException {
         try {
-            lastTuple = scanIterator.next();
-            if (lastTuple != null) {
-                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                lastTuple.getKey(ptr);
-            }
-        } catch (SQLException e) {
+            renewLeaseLock.lock();
+            initScanner();
             try {
-                throw ServerUtil.parseServerException(e);
-            } catch(StaleRegionBoundaryCacheException e1) {
-                if(ScanUtil.isNonAggregateScan(scan)) {
-                    // For non aggregate queries if we get stale region boundary exception we can
-                    // continue scanning from the next value of lasted fetched result.
-                    Scan newScan = ScanUtil.newScan(scan);
-                    newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
-                    if(lastTuple != null) {
-                        lastTuple.getKey(ptr);
-                        byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                        if(ScanUtil.isLocalIndex(newScan)) {
-                            // If we just set scan start row suffix then server side we prepare
-                            // actual scan boundaries by prefixing the region start key.
-                            newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix));
-                        } else {
-                            newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+                lastTuple = scanIterator.next();
+                if (lastTuple != null) {
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    lastTuple.getKey(ptr);
+                }
+            } catch (SQLException e) {
+                try {
+                    throw ServerUtil.parseServerException(e);
+                } catch(StaleRegionBoundaryCacheException e1) {
+                    if(ScanUtil.isNonAggregateScan(scan)) {
+                        // For non aggregate queries if we get stale region boundary exception we can
+                        // continue scanning from the next value of lasted fetched result.
+                        Scan newScan = ScanUtil.newScan(scan);
+                        newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
+                        if(lastTuple != null) {
+                            lastTuple.getKey(ptr);
+                            byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                            if(ScanUtil.isLocalIndex(newScan)) {
+                                // If we just set scan start row suffix then server side we prepare
+                                // actual scan boundaries by prefixing the region start key.
+                                newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix));
+                            } else {
+                                newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+                            }
                         }
+                        plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
+                        this.scanIterator =
+                                plan.iterator(scanGrouper, newScan);
+                        lastTuple = scanIterator.next();
+                    } else {
+                        throw e;
                     }
-                    plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                    this.scanIterator =
-                            plan.iterator(scanGrouper, newScan);
-                    lastTuple = scanIterator.next();
-                } else {
-                    throw e;
                 }
             }
+            return lastTuple;
+        } finally {
+            renewLeaseLock.unlock();
         }
-        return lastTuple;
     }
 
-    public synchronized void initScanner() throws SQLException {
-        if (closed) {
-            return;
-        }
-        ResultIterator delegate = this.scanIterator;
-        if (delegate == UNINITIALIZED_SCANNER) {
-            try {
-                this.scanIterator =
-                        new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
-            } catch (IOException e) {
-                Closeables.closeQuietly(htable);
-                throw ServerUtil.parseServerException(e);
+    public void initScanner() throws SQLException {
+        try {
+            renewLeaseLock.lock();
+            if (closed) {
+                return;
+            }
+            ResultIterator delegate = this.scanIterator;
+            if (delegate == UNINITIALIZED_SCANNER) {
+                try {
+                    this.scanIterator =
+                            new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
+                } catch (IOException e) {
+                    Closeables.closeQuietly(htable);
+                    throw ServerUtil.parseServerException(e);
+                }
             }
+        } finally {
+            renewLeaseLock.unlock();
         }
     }
 
@@ -181,27 +202,42 @@ public class TableResultIterator implements ResultIterator {
         return "TableResultIterator [htable=" + htable + ", scan=" + scan  + "]";
     }
 
-    public synchronized RenewLeaseStatus renewLease() {
-        if (closed) {
-            return CLOSED;
-        }
-        if (scanIterator == UNINITIALIZED_SCANNER) {
-            return UNINITIALIZED;
-        }
-        long delay = now() - renewLeaseTime;
-        if (delay < renewLeaseThreshold) {
-            return THRESHOLD_NOT_REACHED;
-        }
-        if (scanIterator instanceof ScanningResultIterator
-                && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) {
-            // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed.
-            boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease();
-            if (leaseRenewed) {
-                renewLeaseTime = now();
-                return RENEWED;
+    public RenewLeaseStatus renewLease() {
+        boolean lockAcquired = false;
+        try {
+            lockAcquired = renewLeaseLock.tryLock();
+            if (lockAcquired) {
+                if (closed) {
+                    return CLOSED;
+                }
+                if (scanIterator == UNINITIALIZED_SCANNER) {
+                    return UNINITIALIZED;
+                }
+                long delay = now() - renewLeaseTime;
+                if (delay < renewLeaseThreshold) {
+                    return THRESHOLD_NOT_REACHED;
+                }
+                if (scanIterator instanceof ScanningResultIterator
+                        && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) {
+                    // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed.
+                    boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease();
+                    if (leaseRenewed) {
+                        renewLeaseTime = now();
+                        return RENEWED;
+                    } else {
+                        return NOT_RENEWED;
+                    }
+                } else {
+                    return NOT_SUPPORTED;
+                }
+            }
+            return LOCK_NOT_ACQUIRED;
+        } 
+        finally {
+            if (lockAcquired) {
+                renewLeaseLock.unlock();
             }
         }
-        return NOT_RENEWED;
     }
 
     private static long now() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 03a5e13..8ba2c81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4077,8 +4077,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                             scanningItr));
                                     logger.info("Lease renewed for scanner: " + scanningItr);
                                     break;
+                                // Scanner not initialized probably because next() hasn't been called on it yet. Enqueue it back to attempt lease renewal later.
                                 case UNINITIALIZED:
+                                // Threshold not yet reached. Re-enqueue to renew lease later.
                                 case THRESHOLD_NOT_REACHED:
+                                // Another scanner operation in progress. Re-enqueue to attempt renewing lease later.
+                                case LOCK_NOT_ACQUIRED:
                                     // add it back at the tail
                                     scannerQueue.offer(new WeakReference<TableResultIterator>(
                                             scanningItr));
@@ -4086,7 +4090,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     // if lease wasn't renewed or scanner was closed, don't add the
                                     // scanner back to the queue.
                                 case CLOSED:
-                                case NOT_RENEWED:
+                                case NOT_SUPPORTED:
                                     break;
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
index 7d8904d..2969fdc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.query;
 
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
-import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -48,7 +48,8 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
         // create a scanner and add it to the queue
         int numLeaseRenewals = 4;
         int skipRenewLeaseCount = 2;
-        RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, -1);
+        int failToAcquireLockAt = 3;
+        RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, failToAcquireLockAt, -1);
         LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
         scannerQueue.add(new WeakReference<TableResultIterator>(itr));
         
@@ -69,7 +70,7 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
         task.run();
         assertTrue(scannerQueue.size() == 1);
         assertTrue(connectionsQueue.size() == 1);
-        assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed
+        assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus()); // lock couldn't be acquired
         
         task.run();
         assertTrue(scannerQueue.size() == 1);
@@ -96,9 +97,10 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
         
         // create a scanner and add it to the queue
         int numLeaseRenewals = 4;
+        int lockNotAcquiredAt = 1;
         int thresholdNotReachedCount = 2;
-        int leaseNotRenewedCount = 3;
-        RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, leaseNotRenewedCount);
+        int failLeaseRenewalAt = 3;
+        RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt);
         LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
         scannerQueue.add(new WeakReference<TableResultIterator>(itr));
         
@@ -108,8 +110,8 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
         
         task.run();
         assertTrue(connectionsQueue.size() == 1); 
-        assertTrue(scannerQueue.size() == 1); // lease renewed
-        assertEquals(RENEWED, itr.getLastRenewLeaseStatus());
+        assertTrue(scannerQueue.size() == 1); // lock not acquired
+        assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus());
         
         task.run();
         assertTrue(scannerQueue.size() == 1);
@@ -118,10 +120,7 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
         
         task.run();
         assertTrue(scannerQueue.size() == 0);
-        assertTrue(connectionsQueue.size() == 1);
-        // Lease not renewed due to error or some other reason.
-        // In this case we don't call renew lease on the scanner anymore.
-        assertEquals(NOT_RENEWED, itr.getLastRenewLeaseStatus());
+        assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error
         
         pconn.close();
         task.run();