You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gr...@apache.org on 2014/05/14 11:02:57 UTC
git commit: PHOENIX-966 Context classloader for Phoenix classes
Repository: incubator-phoenix
Updated Branches:
refs/heads/master e7cd8b960 -> f50ba17af
PHOENIX-966 Context classloader for Phoenix classes
Temporarily set the current thread's context classloader in calls
that require creating an HBase configuration or accessing Phoenix
coprocessor endpoint classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/f50ba17a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/f50ba17a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/f50ba17a
Branch: refs/heads/master
Commit: f50ba17af36cd0ee76b1ec331c8fe0f109596df5
Parents: e7cd8b9
Author: Gabriel Reid <gr...@apache.org>
Authored: Sat May 3 19:00:36 2014 +0200
Committer: Gabriel Reid <ga...@ngdata.com>
Committed: Wed May 14 08:25:33 2014 +0200
----------------------------------------------------------------------
.../phoenix/end2end/ContextClassloaderIT.java | 174 +++++++++++++++++++
.../apache/phoenix/jdbc/PhoenixStatement.java | 123 +++++++------
.../java/org/apache/phoenix/job/JobManager.java | 37 +++-
.../phoenix/query/ConfigurationFactory.java | 22 ++-
.../query/ConnectionQueryServicesImpl.java | 141 ++++++++-------
.../phoenix/util/PhoenixContextExecutor.java | 75 ++++++++
.../util/PhoenixContextExecutorTest.java | 49 ++++++
7 files changed, 506 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
new file mode 100644
index 0000000..c484dce
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ConfigUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ContextClassloaderIT {
+
+ private static HBaseTestingUtility hbaseTestUtil;
+ private static ClassLoader badContextClassloader;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ hbaseTestUtil = new HBaseTestingUtility();
+ ConfigUtil.setReplicationConfigIfAbsent(hbaseTestUtil.getConfiguration());
+ hbaseTestUtil.getConfiguration().setInt(QueryServices.MASTER_INFO_PORT_ATTRIB, -1);
+ hbaseTestUtil.getConfiguration().setInt(QueryServices.REGIONSERVER_INFO_PORT_ATTRIB, -1);
+ hbaseTestUtil.startMiniCluster();
+ Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE test (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
+ stmt.execute("UPSERT INTO test VALUES (1, 'name1')");
+ stmt.execute("UPSERT INTO test VALUES (2, 'name2')");
+ stmt.close();
+ conn.commit();
+ conn.close();
+ badContextClassloader = new URLClassLoader(new URL[] {
+ File.createTempFile("invalid", ".jar").toURI().toURL() }, null);
+ }
+
+ private static String getUrl() {
+ return "jdbc:phoenix:localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testQueryWithDifferentContextClassloader() throws SQLException, InterruptedException {
+ Runnable target = new Runnable() {
+
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select * from test where name = 'name2'");
+ while (rs.next()) {
+ // Just make sure we run over all records
+ }
+ rs.close();
+ stmt.close();
+ conn.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ BadContextClassloaderThread t = new BadContextClassloaderThread(target);
+ t.start();
+ t.join();
+ assertFalse(t.failed);
+ }
+
+ @Test
+ public void testGetDatabaseMetadataWithDifferentContextClassloader() throws InterruptedException {
+ Runnable target = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ ResultSet tablesRs = conn.getMetaData().getTables(null, null, null, null);
+ while (tablesRs.next()) {
+ // Just make sure we run over all records
+ }
+ tablesRs.close();
+ conn.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ BadContextClassloaderThread t = new BadContextClassloaderThread(target);
+ t.start();
+ t.join();
+ assertFalse(t.failed);
+ }
+
+ @Test
+ public void testExecuteDdlWithDifferentContextClassloader() throws InterruptedException {
+ Runnable target = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE T2 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
+ stmt.execute("UPSERT INTO T2 VALUES (1, 'name1')");
+ conn.commit();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM T2");
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ rs.close();
+ stmt.close();
+ conn.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ BadContextClassloaderThread t = new BadContextClassloaderThread(target);
+ t.start();
+ t.join();
+ assertFalse(t.failed);
+ }
+
+ static class BadContextClassloaderThread extends Thread {
+
+ private final Runnable target;
+ boolean failed = false;
+
+ public BadContextClassloaderThread(Runnable target) {
+ super("BadContextClassloaderThread");
+ this.target = target;
+ setContextClassLoader(badContextClassloader);
+ }
+
+ @Override
+ public void run() {
+ try {
+ target.run();
+ } catch (Throwable t) {
+ failed = true;
+ throw new RuntimeException(t);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 74eaecf..3c27cc9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -31,7 +31,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ColumnResolver;
@@ -43,7 +47,6 @@ import org.apache.phoenix.compile.DropSequenceCompiler;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExpressionProjector;
import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.compile.SubselectRewriter;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -53,6 +56,7 @@ import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.compile.SubselectRewriter;
import org.apache.phoenix.compile.UpsertCompiler;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.BatchUpdateExecution;
@@ -109,13 +113,11 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixContextExecutor;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ServerUtil;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-
/**
*
@@ -195,58 +197,81 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
return connection.getQueryServices().getOptimizer().optimize(this, plan);
}
- protected PhoenixResultSet executeQuery(CompilableStatement stmt) throws SQLException {
+ protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException {
try {
- QueryPlan plan = stmt.compilePlan(this);
- plan = connection.getQueryServices().getOptimizer().optimize(this, plan);
- plan.getContext().getSequenceManager().validateSequences(stmt.getSequenceAction());;
- PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector());
- resultSets.add(rs);
- setLastQueryPlan(plan);
- setLastResultSet(rs);
- setLastUpdateCount(NO_UPDATE);
- setLastUpdateOperation(stmt.getOperation());
- return rs;
- } catch (RuntimeException e) {
- // FIXME: Expression.evaluate does not throw SQLException
- // so this will unwrap throws from that.
- if (e.getCause() instanceof SQLException) {
- throw (SQLException) e.getCause();
- }
- throw e;
+ return PhoenixContextExecutor.call(new Callable<PhoenixResultSet>() {
+ @Override
+ public PhoenixResultSet call() throws Exception {
+ try {
+ QueryPlan plan = stmt.compilePlan(PhoenixStatement.this);
+ plan = connection.getQueryServices().getOptimizer().optimize(
+ PhoenixStatement.this, plan);
+ plan.getContext().getSequenceManager().validateSequences(stmt.getSequenceAction());
+ PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector());
+ resultSets.add(rs);
+ setLastQueryPlan(plan);
+ setLastResultSet(rs);
+ setLastUpdateCount(NO_UPDATE);
+ setLastUpdateOperation(stmt.getOperation());
+ return rs;
+ } catch (RuntimeException e) {
+ // FIXME: Expression.evaluate does not throw SQLException
+ // so this will unwrap throws from that.
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ }
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, SQLException.class);
+ throw Throwables.propagate(e);
}
}
- protected int executeMutation(CompilableStatement stmt) throws SQLException {
- // Note that the upsert select statements will need to commit any open transaction here,
- // since they'd update data directly from coprocessors, and should thus operate on
- // the latest state
+ protected int executeMutation(final CompilableStatement stmt) throws SQLException {
try {
- MutationPlan plan = stmt.compilePlan(this);
- plan.getContext().getSequenceManager().validateSequences(stmt.getSequenceAction());;
- MutationState state = plan.execute();
- connection.getMutationState().join(state);
- if (connection.getAutoCommit()) {
- connection.commit();
- }
- setLastResultSet(null);
- setLastQueryPlan(null);
- // Unfortunately, JDBC uses an int for update count, so we
- // just max out at Integer.MAX_VALUE
- int lastUpdateCount = (int)Math.min(Integer.MAX_VALUE, state.getUpdateCount());
- setLastUpdateCount(lastUpdateCount);
- setLastUpdateOperation(stmt.getOperation());
- return lastUpdateCount;
- } catch (RuntimeException e) {
- // FIXME: Expression.evaluate does not throw SQLException
- // so this will unwrap throws from that.
- if (e.getCause() instanceof SQLException) {
- throw (SQLException) e.getCause();
- }
- throw e;
+ return PhoenixContextExecutor.call(
+ new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+
+ // Note that the upsert select statements will need to commit any open transaction here,
+ // since they'd update data directly from coprocessors, and should thus operate on
+ // the latest state
+ try {
+ MutationPlan plan = stmt.compilePlan(PhoenixStatement.this);
+ plan.getContext().getSequenceManager().validateSequences(stmt.getSequenceAction());
+ MutationState state = plan.execute();
+ connection.getMutationState().join(state);
+ if (connection.getAutoCommit()) {
+ connection.commit();
+ }
+ setLastResultSet(null);
+ setLastQueryPlan(null);
+ // Unfortunately, JDBC uses an int for update count, so we
+ // just max out at Integer.MAX_VALUE
+ int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount());
+ setLastUpdateCount(lastUpdateCount);
+ setLastUpdateOperation(stmt.getOperation());
+ return lastUpdateCount;
+ } catch (RuntimeException e) {
+ // FIXME: Expression.evaluate does not throw SQLException
+ // so this will unwrap throws from that.
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ }
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, SQLException.class);
+ throw Throwables.propagate(e);
}
}
-
+
protected static interface CompilableStatement extends BindableStatement {
public <T extends StatementPlan> T compilePlan (PhoenixStatement stmt) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
index eee4249..de10042 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.job;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
@@ -61,9 +62,13 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
} else {
queue = new JobManager<Runnable>(queueSize);
}
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
- "phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement()
- + "-thread-%s").setDaemon(true).build();
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement() + "-thread-%s")
+ .setDaemon(true)
+ .setThreadFactory(
+ new ContextClassLoaderThreadFactory(JobManager.class.getClassLoader()))
+ .build();
// For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
ThreadPoolExecutor exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
@Override
@@ -120,5 +125,31 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
public static interface JobCallable<T> extends Callable<T> {
public Object getJobId();
}
+
+
+ /**
+ * Extension of the default thread factory returned by {@code Executors.defaultThreadFactory}
+ * that sets the context classloader on newly-created threads to be a specific classloader (and
+ * not the context classloader of the calling thread).
+ * <p/>
+ * See {@link org.apache.phoenix.util.PhoenixContextExecutor} for the rationale on changing
+ * the context classloader.
+ */
+ static class ContextClassLoaderThreadFactory implements ThreadFactory {
+ private final ThreadFactory baseFactory;
+ private final ClassLoader contextClassLoader;
+
+ public ContextClassLoaderThreadFactory(ClassLoader contextClassLoader) {
+ baseFactory = Executors.defaultThreadFactory();
+ this.contextClassLoader = contextClassLoader;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = baseFactory.newThread(r);
+ t.setContextClassLoader(contextClassLoader);
+ return t;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
index a21b6d9..3e6f834 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
@@ -17,8 +17,11 @@
*/
package org.apache.phoenix.query;
+import java.util.concurrent.Callable;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.util.PhoenixContextExecutor;
/**
* Creates {@link Configuration} instances that contain HBase/Hadoop settings.
@@ -32,13 +35,30 @@ public interface ConfigurationFactory {
*/
Configuration getConfiguration();
+ Configuration getConfiguration(Configuration conf);
+
/**
* Default implementation uses {@link org.apache.hadoop.hbase.HBaseConfiguration#create()}.
*/
static class ConfigurationFactoryImpl implements ConfigurationFactory {
@Override
public Configuration getConfiguration() {
- return HBaseConfiguration.create();
+ return PhoenixContextExecutor.callWithoutPropagation(new Callable<Configuration>() {
+ @Override
+ public Configuration call() throws Exception {
+ return HBaseConfiguration.create();
+ }
+ });
+ }
+
+ @Override
+ public Configuration getConfiguration(final Configuration conf) {
+ return PhoenixContextExecutor.callWithoutPropagation(new Callable<Configuration>() {
+ @Override
+ public Configuration call() throws Exception {
+ return HBaseConfiguration.create(conf);
+ }
+ });
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1faf52f..cce5f0a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -60,12 +60,13 @@ import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -153,6 +154,7 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ConfigUtil;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixContextExecutor;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -162,13 +164,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
-import com.google.protobuf.HBaseZeroCopyByteString;
+
public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
@@ -230,7 +233,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
- this.config = HBaseConfiguration.create(config);
+ this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
// set replication required parameter
ConfigUtil.setReplicationConfigIfAbsent(this.config);
this.props = new ReadOnlyProps(this.config.iterator());
@@ -1417,69 +1420,83 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public void init(String url, Properties props) throws SQLException {
- if (initialized) {
- if (initializationException != null) {
- // Throw previous initialization exception, as we won't resuse this instance
- throw initializationException;
- }
- return;
- }
- synchronized (this) {
- if (initialized) {
- if (initializationException != null) {
- // Throw previous initialization exception, as we won't resuse this instance
- throw initializationException;
- }
- return;
- }
- if (closed) {
- throw new SQLException("The connection to the cluster has been closed.");
- }
-
- SQLException sqlE = null;
- PhoenixConnection metaConnection = null;
- try {
- openConnection();
- Properties scnProps = PropertiesUtil.deepCopy(props);
- scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
- scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
- metaConnection = new PhoenixConnection(this, url, scnProps, newEmptyMetaData());
- try {
- metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
- } catch (NewerTableAlreadyExistsException ignore) {
- // Ignore, as this will happen if the SYSTEM.TABLE already exists at this fixed timestamp.
- // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
- }
- try {
- metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
- } catch (NewerTableAlreadyExistsException ignore) {
- // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
- // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
- }
- upgradeMetaDataTo3_0(url, props);
- } catch (SQLException e) {
- sqlE = e;
- } finally {
- try {
- if (metaConnection != null) metaConnection.close();
- } catch (SQLException e) {
- if (sqlE != null) {
- sqlE.setNextException(e);
- } else {
- sqlE = e;
+ public void init(final String url, final Properties props) throws SQLException {
+ try {
+ PhoenixContextExecutor.call(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (initialized) {
+ if (initializationException != null) {
+ // Throw previous initialization exception, as we won't resuse this instance
+ throw initializationException;
+ }
+ return null;
}
- } finally {
- try {
- if (sqlE != null) {
- initializationException = sqlE;
- throw sqlE;
+ synchronized (this) {
+ if (initialized) {
+ if (initializationException != null) {
+ // Throw previous initialization exception, as we won't resuse this instance
+ throw initializationException;
+ }
+ return null;
+ }
+ if (closed) {
+ throw new SQLException("The connection to the cluster has been closed.");
+ }
+
+ SQLException sqlE = null;
+ PhoenixConnection metaConnection = null;
+ try {
+ openConnection();
+ Properties scnProps = PropertiesUtil.deepCopy(props);
+ scnProps.setProperty(
+ PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+ scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ metaConnection = new PhoenixConnection(
+ ConnectionQueryServicesImpl.this, url, scnProps, newEmptyMetaData());
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {
+ // Ignore, as this will happen if the SYSTEM.TABLE already exists at this fixed timestamp.
+ // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
+ }
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {
+ // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
+ // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
+ }
+ upgradeMetaDataTo3_0(url, props);
+ } catch (SQLException e) {
+ sqlE = e;
+ } finally {
+ try {
+ if (metaConnection != null) metaConnection.close();
+ } catch (SQLException e) {
+ if (sqlE != null) {
+ sqlE.setNextException(e);
+ } else {
+ sqlE = e;
+ }
+ } finally {
+ try {
+ if (sqlE != null) {
+ initializationException = sqlE;
+ throw sqlE;
+ }
+ } finally {
+ initialized = true;
+ }
+ }
}
- } finally {
- initialized = true;
}
+ return null;
}
- }
+ });
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, SQLException.class);
+ throw Throwables.propagate(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixContextExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixContextExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixContextExecutor.java
new file mode 100644
index 0000000..a725c2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixContextExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.util.concurrent.Callable;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Executes {@code Callable}s using a context classloader that is set up to load classes from
+ * Phoenix.
+ * <p/>
+ * Loading HBase configuration settings and endpoint coprocessor classes is done via the context
+ * classloader of the calling thread. When Phoenix is being run via a JDBC-enabled GUI, the
+ * driver is often loaded dynamically and executed via multiple threads, which makes it difficult
+ * or impossible to predict the state of the classloader hierarchy in the current thread. This
+ * class is intended to get around that, to ensure that the same classloader used to load Phoenix
+ * classes is set as the context classloader for specific calls.
+ */
+public class PhoenixContextExecutor {
+
+ /**
+ * Execute an operation (synchronously) using the context classloader used to load this class,
+ * instead of the currently-set context classloader of the current thread. This allows loading
+ * dynamically-loaded classes and configuration files using the same classloader used to
+ * load the rest of the JDBC driver.
+ * <p/>
+ * The context classloader of the current thread is reset to its original value after the
+ * callable has been executed.
+ *
+ * @param target the callable to be executed
+ * @return the return value from the callable
+ */
+ public static <T> T call(Callable<T> target) throws Exception {
+ ClassLoader saveCcl = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ PhoenixContextExecutor.class.getClassLoader());
+ return target.call();
+ } finally {
+ Thread.currentThread().setContextClassLoader(saveCcl);
+ }
+ }
+
+ /**
+ * Same as {@link #call(java.util.concurrent.Callable)}, but doesn't throw checked exceptions.
+ *
+ * @param target the callable to be executed
+ * @return the return value from the callable
+ * @throws Exception any exception thrown by the underlying callable
+ */
+ public static <T> T callWithoutPropagation(Callable<T> target) {
+ try {
+ return call(target);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/f50ba17a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixContextExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixContextExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixContextExecutorTest.java
new file mode 100644
index 0000000..4c85c85
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixContextExecutorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.Callable;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PhoenixContextExecutorTest {
+ @Test
+ public void testCall() {
+ URLClassLoader customerClassLoader = new URLClassLoader(new URL[]{});
+ ClassLoader saveCcl = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(customerClassLoader);
+ try {
+ PhoenixContextExecutor.callWithoutPropagation(new Callable<Object>() {
+ @Override
+ public Object call() {
+ assertEquals(
+ PhoenixContextExecutor.class.getClassLoader(),
+ Thread.currentThread().getContextClassLoader());
+ return null;
+ }
+ });
+ } finally {
+ Thread.currentThread().setContextClassLoader(saveCcl);
+ }
+
+ }
+}