You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/08 23:32:25 UTC

git commit: PHOENIX-1143 Prevent race condition between creating phoenix connection and closing phoenix driver/connection query services (Samarth Jain)

Repository: phoenix
Updated Branches:
  refs/heads/3.0 0272e2841 -> f450d8953


PHOENIX-1143 Prevent race condition between creating phoenix connection and closing phoenix driver/connection query services (Samarth Jain)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f450d895
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f450d895
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f450d895

Branch: refs/heads/3.0
Commit: f450d8953f7a113b526561478050a188e0c19c96
Parents: 0272e28
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Aug 8 14:35:59 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Aug 8 14:35:59 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/DecodeFunctionIT.java       |   9 +-
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  | 163 ++++++++++++-------
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     |   3 +
 .../query/ConnectionQueryServicesImpl.java      |  75 ++++++---
 .../apache/phoenix/jdbc/PhoenixTestDriver.java  |  17 +-
 5 files changed, 177 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
index 19dad3e..05e2504 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -26,12 +29,10 @@ import java.sql.SQLException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.PDataType;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+@Category(HBaseManagedTimeTest.class)
 public class DecodeFunctionIT extends BaseHBaseManagedTimeIT {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index ac8f330..10b15dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Collection;
@@ -24,6 +25,10 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.concurrent.GuardedBy;
 
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -52,6 +57,7 @@ import org.slf4j.LoggerFactory;
 public final class PhoenixDriver extends PhoenixEmbeddedDriver {
     private static final Logger logger = LoggerFactory.getLogger(PhoenixDriver.class);
     public static final PhoenixDriver INSTANCE;
+    private static volatile String driverShutdownMsg;
     static {
         try {
             DriverManager.registerDriver( INSTANCE = new PhoenixDriver() );
@@ -64,6 +70,8 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
                         INSTANCE.close();
                     } catch (SQLException e) {
                         logger.warn("Unable to close PhoenixDriver on shutdown", e);
+                    } finally {
+                        driverShutdownMsg = "Phoenix driver closed because server is shutting down";
                     }
                 }
             });
@@ -78,27 +86,37 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
         // Use production services implementation
         super();
     }
-
+    
+    // writes guarded by "this"
     private volatile QueryServices services;
+    
+    @GuardedBy("closeLock")
     private volatile boolean closed = false;
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+    
 
     @Override
     public QueryServices getQueryServices() {
-        checkClosed();
+        try {
+            closeLock.readLock().lock();
+            checkClosed();
 
-        // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
-    	// object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
-    	// made at driver initialization time which is too early for some systems.
-    	QueryServices result = services;
-    	if (result == null) {
-    		synchronized(this) {
-    			result = services;
-    			if(result == null) {
-    				services = result = new QueryServicesImpl(getDefaultProps());
-    			}
-    		}
-    	}
-    	return result;
+            // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
+            // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
+            // made at driver initialization time which is too early for some systems.
+            QueryServices result = services;
+            if (result == null) {
+                synchronized(this) {
+                    result = services;
+                    if(result == null) {
+                        services = result = new QueryServicesImpl(getDefaultProps());
+                    }
+                }
+            }
+            return result;
+        } finally {
+            closeLock.readLock().unlock();
+        }
     }
 
     @Override
@@ -106,71 +124,96 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
         // Accept the url only if test=true attribute not set
         return super.acceptsURL(url) && !isTestUrl(url);
     }
-
+    
+    @Override
+    public Connection connect(String url, Properties info) throws SQLException {
+        try {
+            closeLock.readLock().lock();
+            checkClosed();
+            return super.connect(url, info);
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+    
     @Override
     protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
-        checkClosed();
-
-        ConnectionInfo connInfo = ConnectionInfo.create(url);
-        QueryServices services = getQueryServices();
-        ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps());
-        ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
-        if (connectionQueryServices == null) {
-            if (normalizedConnInfo.isConnectionless()) {
-                connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo);
-            } else {
-                connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo);
+        try {
+            closeLock.readLock().lock();
+            checkClosed();
+            ConnectionInfo connInfo = ConnectionInfo.create(url);
+            QueryServices services = getQueryServices();
+            ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps());
+            ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
+            if (connectionQueryServices == null) {
+                if (normalizedConnInfo.isConnectionless()) {
+                    connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo);
+                } else {
+                    connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo);
+                }
+                ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
+                if (prevValue != null) {
+                    connectionQueryServices = prevValue;
+                }
             }
-            ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
-            if (prevValue != null) {
-                connectionQueryServices = prevValue;
+            boolean success = false;
+            SQLException sqlE = null;
+            try {
+                connectionQueryServices.init(url, info);
+                success = true;
+            } catch (SQLException e) {
+                sqlE = e;
             }
-        }
-        boolean success = false;
-        SQLException sqlE = null;
-        try {
-            connectionQueryServices.init(url, info);
-            success = true;
-        } catch (SQLException e) {
-            sqlE = e;
-        }
-        finally {
-            if (!success) {
-                try {
-                    connectionQueryServices.close();
-                } catch (SQLException e) {
-                    if (sqlE == null) {
-                        sqlE = e;
-                    } else {
-                        sqlE.setNextException(e);
-                    }
-                } finally {
-                    // Remove from map, as initialization failed
-                    connectionQueryServicesMap.remove(normalizedConnInfo);
-                    if (sqlE != null) {
-                        throw sqlE;
+            finally {
+                if (!success) {
+                    try {
+                        connectionQueryServices.close();
+                    } catch (SQLException e) {
+                        if (sqlE == null) {
+                            sqlE = e;
+                        } else {
+                            sqlE.setNextException(e);
+                        }
+                    } finally {
+                        // Remove from map, as initialization failed
+                        connectionQueryServicesMap.remove(normalizedConnInfo);
+                        if (sqlE != null) {
+                            throw sqlE;
+                        }
                     }
                 }
             }
+            return connectionQueryServices;
+        } finally {
+            closeLock.readLock().unlock();
         }
-        return connectionQueryServices;
     }
 
     private void checkClosed() {
         if (closed) {
-            throw new IllegalStateException("The Phoenix jdbc driver has been closed.");
+            throwDriverClosedException();
         }
     }
+    
+    private void throwDriverClosedException() {
+        throw new IllegalStateException(driverShutdownMsg != null ? driverShutdownMsg : "The Phoenix jdbc driver has been closed.");
+    }
 
     @Override
     public synchronized void close() throws SQLException {
-        if (closed) {
-            return;
+        try {
+            closeLock.writeLock().lock();
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
         }
-        closed = true;
+
         try {
             Collection<ConnectionQueryServices> connectionQueryServices = connectionQueryServicesMap.values();
-             try {
+            try {
                 SQLCloseables.closeAll(connectionQueryServices);
             } finally {
                 connectionQueryServices.clear();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index ca27075..c706486 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -29,6 +29,8 @@ import java.util.Properties;
 import java.util.StringTokenizer;
 import java.util.logging.Logger;
 
+import javax.annotation.concurrent.Immutable;
+
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -50,6 +52,7 @@ import com.google.common.collect.Maps;
  * 
  * @since 0.1
  */
+@Immutable
 public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoenix.jdbc.Jdbc7Shim.Driver, SQLCloseable {
     /**
      * The protocol for Phoenix Network Client 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f59a672..12224f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -38,6 +38,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -129,9 +131,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final String userName;
     private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
     private final StatsManager statsManager;
+    
     // Cache the latest meta data here for future connections
-    private volatile PMetaData latestMetaData;
+    @GuardedBy("latestMetaDataLock")
+    private PMetaData latestMetaData;
     private final Object latestMetaDataLock = new Object();
+    
     // Lowest HBase version on the cluster.
     private int lowestClusterHBaseVersion = Integer.MAX_VALUE;
     private boolean hasInvalidIndexConfiguration = false;
@@ -139,7 +144,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     private HConnection connection;
     private volatile boolean initialized;
+    
+    // writes guarded by "this"
     private volatile boolean closed;
+    
     private volatile SQLException initializationException;
     protected ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap();
     private KeyValueBuilder kvBuilder;
@@ -275,7 +283,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 } finally {
                     try {
                         childServices.clear();
-                        latestMetaData = null;
+                        synchronized (latestMetaDataLock) {
+                            latestMetaData = null;
+                            latestMetaDataLock.notifyAll();
+                        }
                         if (connection != null) connection.close();
                     } catch (IOException e) {
                         if (sqlE == null) {
@@ -366,16 +377,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @Override
     public PMetaData addTable(PTable table) throws SQLException {
-        try {
-            // If existing table isn't older than new table, don't replace
-            // If a client opens a connection at an earlier timestamp, this can happen
-            PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
-            if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
-                return latestMetaData;
-            }
-        } catch (TableNotFoundException e) {
-        }
-        synchronized(latestMetaDataLock) {
+        synchronized (latestMetaDataLock) {
+            try {
+                throwConnectionClosedIfNullMetaData();
+                // If existing table isn't older than new table, don't replace
+                // If a client opens a connection at an earlier timestamp, this can happen
+                PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
+                if (existingTable.getTimeStamp() >= table.getTimeStamp()) { 
+                    return latestMetaData; 
+                }
+            } catch (TableNotFoundException e) {}
             latestMetaData = latestMetaData.addTable(table);
             latestMetaDataLock.notifyAll();
             return latestMetaData;
@@ -391,7 +402,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      * @param tenantId TODO
      */
     private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException {
-        synchronized(latestMetaDataLock) {
+        synchronized (latestMetaDataLock) {
+            throwConnectionClosedIfNullMetaData();
             PMetaData metaData = latestMetaData;
             PTable table;
             long endTime = System.currentTimeMillis() + DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS;
@@ -451,7 +463,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @Override
     public PMetaData removeTable(PName tenantId, final String tableName) throws SQLException {
-        synchronized(latestMetaDataLock) {
+        synchronized (latestMetaDataLock) {
+            throwConnectionClosedIfNullMetaData();
             latestMetaData = latestMetaData.removeTable(tenantId, tableName);
             latestMetaDataLock.notifyAll();
             return latestMetaData;
@@ -476,7 +489,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @Override
     public PhoenixConnection connect(String url, Properties info) throws SQLException {
-        return new PhoenixConnection(this, url, info, latestMetaData);
+        checkClosed();
+        synchronized (latestMetaDataLock) {
+            throwConnectionClosedIfNullMetaData();
+            latestMetaDataLock.notifyAll();
+            return new PhoenixConnection(this, url, info, latestMetaData);
+        }
     }
 
 
@@ -1088,7 +1106,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length,
                 physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length);
         try {
-            table = latestMetaData.getTable(new PTableKey(tenantId, name));
+            synchronized (latestMetaDataLock) {
+                throwConnectionClosedIfNullMetaData();
+                table = latestMetaData.getTable(new PTableKey(tenantId, name));
+                latestMetaDataLock.notifyAll();
+            }
             if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
                 throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
             }
@@ -1268,10 +1290,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             }
                             return null;
                         }
-                        if (closed) {
-                            throw new SQLException("The connection to the cluster has been closed.");
-                        }
-
+                        checkClosed();
                         SQLException sqlE = null;
                         PhoenixConnection metaConnection = null;
                         try {
@@ -1764,4 +1783,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return userName;
     }
     
+    private void checkClosed() {
+        if (closed) {
+            throwConnectionClosedException();
+        }
+    }
+    
+    private void throwConnectionClosedIfNullMetaData() {
+        if (latestMetaData == null) {
+            throwConnectionClosedException();
+        }
+    }
+    
+    private void throwConnectionClosedException() {
+        throw new IllegalStateException("Connection to the cluster is closed");
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 2423344..0d3c461 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Properties;
 
@@ -50,9 +51,8 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
     @GuardedBy("this")
     private final QueryServices queryServices;
     
-    //The only place it is modified is under a lock provided by "this". 
-    //So ok to have it just as volatile.
-    private volatile boolean closed = false;
+    @GuardedBy("this")
+    private boolean closed = false;
 
     public PhoenixTestDriver() {
         this.overrideProps = ReadOnlyProps.EMPTY_PROPS;
@@ -73,11 +73,16 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
 
     @Override
     public boolean acceptsURL(String url) throws SQLException {
-        checkClosed();
         // Accept the url only if test=true attribute set
         return super.acceptsURL(url) && isTestUrl(url);
     }
-
+    
+    @Override
+    public synchronized Connection connect(String url, Properties info) throws SQLException {
+        checkClosed();
+        return super.connect(url, info);
+    }
+    
     @Override // public for testing
     public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
         checkClosed();
@@ -92,7 +97,7 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
         return connectionQueryServices;
     }
     
-    private void checkClosed() {
+    private synchronized void checkClosed() {
         if (closed) {
             throw new IllegalStateException("The Phoenix jdbc test driver has been closed.");
         }