You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/08 23:32:25 UTC
git commit: PHOENIX-1143 Prevent race condition between creating
phoenix connection and closing phoenix driver/connection query services
(Samarth Jain)
Repository: phoenix
Updated Branches:
refs/heads/3.0 0272e2841 -> f450d8953
PHOENIX-1143 Prevent race condition between creating phoenix connection and closing phoenix driver/connection query services (Samarth Jain)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f450d895
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f450d895
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f450d895
Branch: refs/heads/3.0
Commit: f450d8953f7a113b526561478050a188e0c19c96
Parents: 0272e28
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Aug 8 14:35:59 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Aug 8 14:35:59 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/DecodeFunctionIT.java | 9 +-
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 163 ++++++++++++-------
.../phoenix/jdbc/PhoenixEmbeddedDriver.java | 3 +
.../query/ConnectionQueryServicesImpl.java | 75 ++++++---
.../apache/phoenix/jdbc/PhoenixTestDriver.java | 17 +-
5 files changed, 177 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
index 19dad3e..05e2504 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DecodeFunctionIT.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.end2end;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -26,12 +29,10 @@ import java.sql.SQLException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.PDataType;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+@Category(HBaseManagedTimeTest.class)
public class DecodeFunctionIT extends BaseHBaseManagedTimeIT {
@Test
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index ac8f330..10b15dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.jdbc;
+import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collection;
@@ -24,6 +25,10 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.concurrent.GuardedBy;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -52,6 +57,7 @@ import org.slf4j.LoggerFactory;
public final class PhoenixDriver extends PhoenixEmbeddedDriver {
private static final Logger logger = LoggerFactory.getLogger(PhoenixDriver.class);
public static final PhoenixDriver INSTANCE;
+ private static volatile String driverShutdownMsg;
static {
try {
DriverManager.registerDriver( INSTANCE = new PhoenixDriver() );
@@ -64,6 +70,8 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
INSTANCE.close();
} catch (SQLException e) {
logger.warn("Unable to close PhoenixDriver on shutdown", e);
+ } finally {
+ driverShutdownMsg = "Phoenix driver closed because server is shutting down";
}
}
});
@@ -78,27 +86,37 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
// Use production services implementation
super();
}
-
+
+ // writes guarded by "this"
private volatile QueryServices services;
+
+ @GuardedBy("closeLock")
private volatile boolean closed = false;
+ private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
@Override
public QueryServices getQueryServices() {
- checkClosed();
+ try {
+ closeLock.readLock().lock();
+ checkClosed();
- // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
- // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
- // made at driver initialization time which is too early for some systems.
- QueryServices result = services;
- if (result == null) {
- synchronized(this) {
- result = services;
- if(result == null) {
- services = result = new QueryServicesImpl(getDefaultProps());
- }
- }
- }
- return result;
+ // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
+ // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
+ // made at driver initialization time which is too early for some systems.
+ QueryServices result = services;
+ if (result == null) {
+ synchronized(this) {
+ result = services;
+ if(result == null) {
+ services = result = new QueryServicesImpl(getDefaultProps());
+ }
+ }
+ }
+ return result;
+ } finally {
+ closeLock.readLock().unlock();
+ }
}
@Override
@@ -106,71 +124,96 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
// Accept the url only if test=true attribute not set
return super.acceptsURL(url) && !isTestUrl(url);
}
-
+
+ @Override
+ public Connection connect(String url, Properties info) throws SQLException {
+ try {
+ closeLock.readLock().lock();
+ checkClosed();
+ return super.connect(url, info);
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
@Override
protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
- checkClosed();
-
- ConnectionInfo connInfo = ConnectionInfo.create(url);
- QueryServices services = getQueryServices();
- ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps());
- ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
- if (connectionQueryServices == null) {
- if (normalizedConnInfo.isConnectionless()) {
- connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo);
- } else {
- connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo);
+ try {
+ closeLock.readLock().lock();
+ checkClosed();
+ ConnectionInfo connInfo = ConnectionInfo.create(url);
+ QueryServices services = getQueryServices();
+ ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps());
+ ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
+ if (connectionQueryServices == null) {
+ if (normalizedConnInfo.isConnectionless()) {
+ connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo);
+ } else {
+ connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo);
+ }
+ ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
+ if (prevValue != null) {
+ connectionQueryServices = prevValue;
+ }
}
- ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
- if (prevValue != null) {
- connectionQueryServices = prevValue;
+ boolean success = false;
+ SQLException sqlE = null;
+ try {
+ connectionQueryServices.init(url, info);
+ success = true;
+ } catch (SQLException e) {
+ sqlE = e;
}
- }
- boolean success = false;
- SQLException sqlE = null;
- try {
- connectionQueryServices.init(url, info);
- success = true;
- } catch (SQLException e) {
- sqlE = e;
- }
- finally {
- if (!success) {
- try {
- connectionQueryServices.close();
- } catch (SQLException e) {
- if (sqlE == null) {
- sqlE = e;
- } else {
- sqlE.setNextException(e);
- }
- } finally {
- // Remove from map, as initialization failed
- connectionQueryServicesMap.remove(normalizedConnInfo);
- if (sqlE != null) {
- throw sqlE;
+ finally {
+ if (!success) {
+ try {
+ connectionQueryServices.close();
+ } catch (SQLException e) {
+ if (sqlE == null) {
+ sqlE = e;
+ } else {
+ sqlE.setNextException(e);
+ }
+ } finally {
+ // Remove from map, as initialization failed
+ connectionQueryServicesMap.remove(normalizedConnInfo);
+ if (sqlE != null) {
+ throw sqlE;
+ }
}
}
}
+ return connectionQueryServices;
+ } finally {
+ closeLock.readLock().unlock();
}
- return connectionQueryServices;
}
private void checkClosed() {
if (closed) {
- throw new IllegalStateException("The Phoenix jdbc driver has been closed.");
+ throwDriverClosedException();
}
}
+
+ private void throwDriverClosedException() {
+ throw new IllegalStateException(driverShutdownMsg != null ? driverShutdownMsg : "The Phoenix jdbc driver has been closed.");
+ }
@Override
public synchronized void close() throws SQLException {
- if (closed) {
- return;
+ try {
+ closeLock.writeLock().lock();
+ if (closed) {
+ return;
+ }
+ closed = true;
+ } finally {
+ closeLock.writeLock().unlock();
}
- closed = true;
+
try {
Collection<ConnectionQueryServices> connectionQueryServices = connectionQueryServicesMap.values();
- try {
+ try {
SQLCloseables.closeAll(connectionQueryServices);
} finally {
connectionQueryServices.clear();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index ca27075..c706486 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -29,6 +29,8 @@ import java.util.Properties;
import java.util.StringTokenizer;
import java.util.logging.Logger;
+import javax.annotation.concurrent.Immutable;
+
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -50,6 +52,7 @@ import com.google.common.collect.Maps;
*
* @since 0.1
*/
+@Immutable
public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoenix.jdbc.Jdbc7Shim.Driver, SQLCloseable {
/**
* The protocol for Phoenix Network Client
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f59a672..12224f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -38,6 +38,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import javax.annotation.concurrent.GuardedBy;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -129,9 +131,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final String userName;
private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
private final StatsManager statsManager;
+
// Cache the latest meta data here for future connections
- private volatile PMetaData latestMetaData;
+ @GuardedBy("latestMetaDataLock")
+ private PMetaData latestMetaData;
private final Object latestMetaDataLock = new Object();
+
// Lowest HBase version on the cluster.
private int lowestClusterHBaseVersion = Integer.MAX_VALUE;
private boolean hasInvalidIndexConfiguration = false;
@@ -139,7 +144,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private HConnection connection;
private volatile boolean initialized;
+
+ // writes guarded by "this"
private volatile boolean closed;
+
private volatile SQLException initializationException;
protected ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap();
private KeyValueBuilder kvBuilder;
@@ -275,7 +283,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} finally {
try {
childServices.clear();
- latestMetaData = null;
+ synchronized (latestMetaDataLock) {
+ latestMetaData = null;
+ latestMetaDataLock.notifyAll();
+ }
if (connection != null) connection.close();
} catch (IOException e) {
if (sqlE == null) {
@@ -366,16 +377,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public PMetaData addTable(PTable table) throws SQLException {
- try {
- // If existing table isn't older than new table, don't replace
- // If a client opens a connection at an earlier timestamp, this can happen
- PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
- if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
- return latestMetaData;
- }
- } catch (TableNotFoundException e) {
- }
- synchronized(latestMetaDataLock) {
+ synchronized (latestMetaDataLock) {
+ try {
+ throwConnectionClosedIfNullMetaData();
+ // If existing table isn't older than new table, don't replace
+ // If a client opens a connection at an earlier timestamp, this can happen
+ PTable existingTable = latestMetaData.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
+ if (existingTable.getTimeStamp() >= table.getTimeStamp()) {
+ return latestMetaData;
+ }
+ } catch (TableNotFoundException e) {}
latestMetaData = latestMetaData.addTable(table);
latestMetaDataLock.notifyAll();
return latestMetaData;
@@ -391,7 +402,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
* @param tenantId TODO
*/
private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException {
- synchronized(latestMetaDataLock) {
+ synchronized (latestMetaDataLock) {
+ throwConnectionClosedIfNullMetaData();
PMetaData metaData = latestMetaData;
PTable table;
long endTime = System.currentTimeMillis() + DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS;
@@ -451,7 +463,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public PMetaData removeTable(PName tenantId, final String tableName) throws SQLException {
- synchronized(latestMetaDataLock) {
+ synchronized (latestMetaDataLock) {
+ throwConnectionClosedIfNullMetaData();
latestMetaData = latestMetaData.removeTable(tenantId, tableName);
latestMetaDataLock.notifyAll();
return latestMetaData;
@@ -476,7 +489,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public PhoenixConnection connect(String url, Properties info) throws SQLException {
- return new PhoenixConnection(this, url, info, latestMetaData);
+ checkClosed();
+ synchronized (latestMetaDataLock) {
+ throwConnectionClosedIfNullMetaData();
+ latestMetaDataLock.notifyAll();
+ return new PhoenixConnection(this, url, info, latestMetaData);
+ }
}
@@ -1088,7 +1106,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length,
physicalIndexTableName.length-MetaDataUtil.VIEW_INDEX_TABLE_PREFIX_BYTES.length);
try {
- table = latestMetaData.getTable(new PTableKey(tenantId, name));
+ synchronized (latestMetaDataLock) {
+ throwConnectionClosedIfNullMetaData();
+ table = latestMetaData.getTable(new PTableKey(tenantId, name));
+ latestMetaDataLock.notifyAll();
+ }
if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
}
@@ -1268,10 +1290,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
return null;
}
- if (closed) {
- throw new SQLException("The connection to the cluster has been closed.");
- }
-
+ checkClosed();
SQLException sqlE = null;
PhoenixConnection metaConnection = null;
try {
@@ -1764,4 +1783,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return userName;
}
+ private void checkClosed() {
+ if (closed) {
+ throwConnectionClosedException();
+ }
+ }
+
+ private void throwConnectionClosedIfNullMetaData() {
+ if (latestMetaData == null) {
+ throwConnectionClosedException();
+ }
+ }
+
+ private void throwConnectionClosedException() {
+ throw new IllegalStateException("Connection to the cluster is closed");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f450d895/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 2423344..0d3c461 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.jdbc;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
@@ -50,9 +51,8 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
@GuardedBy("this")
private final QueryServices queryServices;
- //The only place it is modified is under a lock provided by "this".
- //So ok to have it just as volatile.
- private volatile boolean closed = false;
+ @GuardedBy("this")
+ private boolean closed = false;
public PhoenixTestDriver() {
this.overrideProps = ReadOnlyProps.EMPTY_PROPS;
@@ -73,11 +73,16 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
@Override
public boolean acceptsURL(String url) throws SQLException {
- checkClosed();
// Accept the url only if test=true attribute set
return super.acceptsURL(url) && isTestUrl(url);
}
-
+
+ @Override
+ public synchronized Connection connect(String url, Properties info) throws SQLException {
+ checkClosed();
+ return super.connect(url, info);
+ }
+
@Override // public for testing
public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
checkClosed();
@@ -92,7 +97,7 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
return connectionQueryServices;
}
- private void checkClosed() {
+ private synchronized void checkClosed() {
if (closed) {
throw new IllegalStateException("The Phoenix jdbc test driver has been closed.");
}