You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/05/05 17:15:48 UTC
[lucene-solr] 01/01: SOLR-14423: Initial patch. Use SolrClientCache
instance managed by CoreContainer as much as possible (there are a few
places in streaming and sql client classes that can't use it).
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14423
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 2aee6672a8f79faba7f02a545058ad9098ba4175
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue May 5 19:13:52 2020 +0200
SOLR-14423: Initial patch. Use SolrClientCache instance managed by CoreContainer as much as
possible (there are a few places in streaming and sql client classes that can't use it).
This also adds a general-purpose ObjectCache to CoreContainer.
---
.../org/apache/solr/handler/AnalyticsHandler.java | 6 -
.../java/org/apache/solr/cloud/ZkController.java | 5 +-
.../api/collections/ReindexCollectionCmd.java | 6 +-
.../java/org/apache/solr/core/CoreContainer.java | 20 ++
.../java/org/apache/solr/handler/GraphHandler.java | 5 +-
.../org/apache/solr/handler/StreamHandler.java | 27 +-
.../org/apache/solr/handler/admin/ColStatus.java | 5 +-
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../apache/solr/handler/sql/CalciteSolrDriver.java | 361 ++++++++++++++++++++-
.../org/apache/solr/handler/sql/SolrSchema.java | 54 +--
.../org/apache/solr/handler/sql/SolrTable.java | 2 +-
.../solr/metrics/reporters/solr/SolrReporter.java | 21 +-
.../metrics/reporters/solr/SolrShardReporter.java | 2 +-
.../org/apache/solr/search/join/XCJFQuery.java | 2 +-
.../client/solrj/impl/SolrClientCloudManager.java | 18 +-
.../apache/solr/common/util/MapBackedCache.java | 4 +
16 files changed, 467 insertions(+), 73 deletions(-)
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/handler/AnalyticsHandler.java b/solr/contrib/analytics/src/java/org/apache/solr/handler/AnalyticsHandler.java
index b289d1c..9ffbaf4 100644
--- a/solr/contrib/analytics/src/java/org/apache/solr/handler/AnalyticsHandler.java
+++ b/solr/contrib/analytics/src/java/org/apache/solr/handler/AnalyticsHandler.java
@@ -28,8 +28,6 @@ import org.apache.solr.analytics.AnalyticsRequestParser;
import org.apache.solr.analytics.ExpressionFactory;
import org.apache.solr.analytics.TimeExceededStubException;
import org.apache.solr.analytics.stream.AnalyticsShardResponseParser;
-import org.apache.solr.client.solrj.io.ModelCache;
-import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
@@ -61,9 +59,6 @@ public class AnalyticsHandler extends RequestHandlerBase implements SolrCoreAwar
public static final String NAME = "/analytics";
private IndexSchema indexSchema;
- static SolrClientCache clientCache = new SolrClientCache();
- static ModelCache modelCache = null;
-
@Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
return PermissionNameProvider.Name.READ_PERM;
@@ -72,7 +67,6 @@ public class AnalyticsHandler extends RequestHandlerBase implements SolrCoreAwar
@Override
public void inform(SolrCore core) {
core.registerResponseWriter(AnalyticsShardResponseWriter.NAME, new AnalyticsShardResponseWriter());
-
indexSchema = core.getLatestSchema();
AnalyticsRequestParser.init();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e60f2e4..ecbb781 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -755,7 +755,10 @@ public class ZkController implements Closeable {
cloudSolrClient = new CloudSolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)).withSocketTimeout(30000).withConnectionTimeout(15000)
.withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
.withConnectionTimeout(15000).withSocketTimeout(30000).build();
- cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient);
+ cloudManager = new SolrClientCloudManager(
+ new ZkDistributedQueueFactory(zkClient),
+ cloudSolrClient,
+ cc.getObjectCache());
cloudManager.getClusterStateProvider().connect();
}
return cloudManager;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index c362dd3..f70fc5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -38,7 +38,6 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
@@ -163,7 +162,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
}
- private SolrClientCache solrClientCache;
private String zkHost;
public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
@@ -268,7 +266,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Exception exc = null;
boolean createdTarget = false;
try {
- solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
// set the running flag
reindexingState.clear();
@@ -504,7 +501,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
exc = e;
aborted = true;
} finally {
- solrClientCache.close();
if (aborted) {
cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
if (exc != null) {
@@ -550,7 +546,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
private long getNumberOfDocs(String collection) {
- CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost);
+ CloudSolrClient solrClient = ocmh.overseer.getCoreContainer().getSolrClientCache().getCloudSolrClient(zkHost);
try {
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CommonParams.Q, "*:*");
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f332837..62c6a10 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -61,6 +61,7 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.AuthSchemeRegistryProvider;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.CredentialsProviderProvider;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
@@ -76,6 +77,7 @@ import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.ObjectCache;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.DirectoryFactory.DirContext;
@@ -228,6 +230,10 @@ public class CoreContainer {
protected volatile AutoscalingHistoryHandler autoscalingHistoryHandler;
+ private volatile SolrClientCache solrClientCache;
+
+ private volatile ObjectCache objectCache = new ObjectCache();
+
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@@ -576,6 +582,15 @@ public class CoreContainer {
public PackageStoreAPI getPackageStoreAPI() {
return packageStoreAPI;
}
+
+ public SolrClientCache getSolrClientCache() {
+ return solrClientCache;
+ }
+
+ public ObjectCache getObjectCache() {
+ return objectCache;
+ }
+
//-------------------------------------------------------------------
// Initialization / Cleanup
//-------------------------------------------------------------------
@@ -636,6 +651,8 @@ public class CoreContainer {
updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig());
updateShardHandler.initializeMetrics(solrMetricsContext, "updateShardHandler");
+ solrClientCache = new SolrClientCache(updateShardHandler.getDefaultHttpClient());
+
solrCores.load(loader);
@@ -1017,6 +1034,9 @@ public class CoreContainer {
} catch (Exception e) {
log.warn("Error shutting down CoreAdminHandler. Continuing to close CoreContainer.", e);
}
+ if (solrClientCache != null) {
+ solrClientCache.close();
+ }
} finally {
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
index bed4086..4d6f539 100644
--- a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.graph.Traversal;
@@ -83,6 +84,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
private StreamFactory streamFactory = new DefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
+ private SolrClientCache solrClientCache;
@Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@@ -93,6 +95,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
String defaultCollection;
String defaultZkhost;
CoreContainer coreContainer = core.getCoreContainer();
+ this.solrClientCache = coreContainer.getSolrClientCache();
this.coreName = core.getName();
if(coreContainer.isZooKeeperAware()) {
@@ -147,7 +150,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
}
StreamContext context = new StreamContext();
- context.setSolrClientCache(StreamHandler.clientCache);
+ context.setSolrClientCache(solrClientCache);
context.put("core", this.coreName);
Traversal traversal = new Traversal();
context.put("traversal", traversal);
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index bd76ae9..2507692 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -88,12 +88,12 @@ import static org.apache.solr.common.params.CommonParams.ID;
*/
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
- static SolrClientCache clientCache = new SolrClientCache();
- static ModelCache modelCache = null;
- static ConcurrentMap objectCache = new ConcurrentHashMap();
+ private ModelCache modelCache = null;
+ private ConcurrentMap objectCache = new ConcurrentHashMap();
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
+ private SolrClientCache solrClientCache;
private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override
@@ -101,14 +101,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return PermissionNameProvider.Name.READ_PERM;
}
- public static SolrClientCache getClientCache() {
- return clientCache;
+ public SolrClientCache getClientCache() {
+ return solrClientCache;
}
public void inform(SolrCore core) {
String defaultCollection;
String defaultZkhost;
CoreContainer coreContainer = core.getCoreContainer();
+ this.solrClientCache = coreContainer.getSolrClientCache();
this.coreName = core.getName();
if (coreContainer.isZooKeeperAware()) {
@@ -118,24 +119,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
streamFactory.withDefaultZkHost(defaultZkhost);
modelCache = new ModelCache(250,
defaultZkhost,
- clientCache);
+ solrClientCache);
}
streamFactory.withSolrResourceLoader(core.getResourceLoader());
// This pulls all the overrides and additions from the config
addExpressiblePlugins(streamFactory, core);
-
- core.addCloseHook(new CloseHook() {
- @Override
- public void preClose(SolrCore core) {
- // To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public void postClose(SolrCore core) {
- clientCache.close();
- }
- });
}
public static void addExpressiblePlugins(StreamFactory streamFactory, SolrCore core) {
@@ -226,7 +215,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
context.put("shards", getCollectionShards(params));
context.workerID = worker;
context.numWorkers = numWorkers;
- context.setSolrClientCache(clientCache);
+ context.setSolrClientCache(solrClientCache);
context.setModelCache(modelCache);
context.setObjectCache(objectCache);
context.put("core", this.coreName);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 609b39b..aee0097 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -66,9 +65,9 @@ public class ColStatus {
public static final String RAW_SIZE_SAMPLING_PERCENT_PROP = SegmentsInfoRequestHandler.RAW_SIZE_SAMPLING_PERCENT_PARAM;
public static final String SEGMENTS_PROP = "segments";
- public ColStatus(HttpClient httpClient, ClusterState clusterState, ZkNodeProps props) {
+ public ColStatus(SolrClientCache solrClientCache, ClusterState clusterState, ZkNodeProps props) {
this.props = props;
- this.solrClientCache = new SolrClientCache(httpClient);
+ this.solrClientCache = solrClientCache;
this.clusterState = clusterState;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d63b48f..203efe5 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -529,7 +529,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (props.containsKey(CoreAdminParams.NAME) && !props.containsKey(COLLECTION_PROP)) {
props.put(COLLECTION_PROP, props.get(CoreAdminParams.NAME));
}
- new ColStatus(h.coreContainer.getUpdateShardHandler().getDefaultHttpClient(),
+ new ColStatus(h.coreContainer.getSolrClientCache(),
h.coreContainer.getZkController().getZkStateReader().getClusterState(), new ZkNodeProps(props))
.getColStatus(rsp.getValues());
return null;
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java b/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java
index 3a7640d..4ebe993 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java
@@ -16,13 +16,35 @@
*/
package org.apache.solr.handler.sql;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.Driver;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.SchemaPlus;
+import java.lang.reflect.Type;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Executor;
/**
* JDBC driver for Calcite Solr.
@@ -59,11 +81,346 @@ public class CalciteSolrDriver extends Driver {
if(schemaName == null) {
throw new SQLException("zk must be set");
}
- rootSchema.add(schemaName, new SolrSchema(info));
+ SolrSchema solrSchema = new SolrSchema(info);
+ rootSchema.add(schemaName, solrSchema);
// Set the default schema
calciteConnection.setSchema(schemaName);
- return connection;
+ return new SolrCalciteConnectionWrapper(calciteConnection, solrSchema);
+ }
+
+ // the sole purpose of this class is to be able to invoke SolrSchema.close()
+ // when the connection is closed.
+ private static final class SolrCalciteConnectionWrapper implements CalciteConnection {
+
+ private final CalciteConnection delegate;
+ private final SolrSchema schema;
+
+ SolrCalciteConnectionWrapper(CalciteConnection delegate, SolrSchema schema) {
+ this.delegate = delegate;
+ this.schema = schema;
+ }
+
+ @Override
+ public SchemaPlus getRootSchema() {
+ return delegate.getRootSchema();
+ }
+
+ @Override
+ public JavaTypeFactory getTypeFactory() {
+ return delegate.getTypeFactory();
+ }
+
+ @Override
+ public Properties getProperties() {
+ return delegate.getProperties();
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return delegate.createStatement();
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return delegate.prepareStatement(sql);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return delegate.prepareCall(sql);
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return delegate.nativeSQL(sql);
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+ delegate.setAutoCommit(autoCommit);
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return delegate.getAutoCommit();
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ delegate.commit();
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+ delegate.rollback();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ schema.close();
+ delegate.close();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return delegate.getMetaData();
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+ delegate.setReadOnly(readOnly);
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return delegate.isReadOnly();
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+ delegate.setCatalog(catalog);
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return delegate.getCatalog();
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+ delegate.setTransactionIsolation(level);
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return delegate.getTransactionIsolation();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return delegate.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ delegate.clearWarnings();
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return delegate.createStatement(resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return delegate.getTypeMap();
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ delegate.setTypeMap(map);
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+ delegate.setHoldability(holdability);
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return delegate.getHoldability();
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return delegate.setSavepoint();
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return delegate.setSavepoint(name);
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+ delegate.rollback(savepoint);
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ delegate.releaseSavepoint(savepoint);
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return delegate.prepareStatement(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return delegate.prepareStatement(sql, columnIndexes);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return delegate.prepareStatement(sql, columnNames);
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return delegate.createClob();
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return delegate.createBlob();
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return delegate.createNClob();
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return delegate.createSQLXML();
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return delegate.isValid(timeout);
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+ delegate.setClientInfo(name, value);
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+ delegate.setClientInfo(properties);
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return delegate.getClientInfo(name);
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return delegate.getClientInfo();
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return delegate.createArrayOf(typeName, elements);
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return delegate.createStruct(typeName, attributes);
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ delegate.setSchema(schema);
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return delegate.getSchema();
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {
+ delegate.abort(executor);
+ }
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+ delegate.setNetworkTimeout(executor, milliseconds);
+ }
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return delegate.getNetworkTimeout();
+ }
+
+ @Override
+ public CalciteConnectionConfig config() {
+ return delegate.config();
+ }
+
+ @Override
+ public CalcitePrepare.Context createPrepareContext() {
+ return delegate.createPrepareContext();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return delegate.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return delegate.isWrapperFor(iface);
+ }
+
+ @Override
+ public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) {
+ return delegate.createQuery(expression, aClass);
+ }
+
+ @Override
+ public <T> Queryable<T> createQuery(Expression expression, Type type) {
+ return delegate.createQuery(expression, type);
+ }
+
+ @Override
+ public <T> T execute(Expression expression, Class<T> aClass) {
+ return delegate.execute(expression, aClass);
+ }
+
+ @Override
+ public <T> T execute(Expression expression, Type type) {
+ return delegate.execute(expression, type);
+ }
+
+ @Override
+ public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+ return delegate.executeQuery(queryable);
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
index b608442..0d7466f 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
@@ -16,10 +16,9 @@
*/
package org.apache.solr.handler.sql;
+import java.io.Closeable;
import java.io.IOException;
-import java.util.Collections;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -33,6 +32,7 @@ import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.request.LukeRequest;
import org.apache.solr.client.solrj.response.LukeResponse;
import org.apache.solr.common.cloud.Aliases;
@@ -41,47 +41,53 @@ import org.apache.solr.common.cloud.ZkStateReader;
import com.google.common.collect.ImmutableMap;
-class SolrSchema extends AbstractSchema {
+class SolrSchema extends AbstractSchema implements Closeable {
final Properties properties;
+ final SolrClientCache solrClientCache = new SolrClientCache();
SolrSchema(Properties properties) {
super();
this.properties = properties;
}
+ public SolrClientCache getSolrClientCache() {
+ return solrClientCache;
+ }
+
+ @Override
+ public void close() {
+ solrClientCache.close();
+ }
+
@Override
protected Map<String, Table> getTableMap() {
String zk = this.properties.getProperty("zk");
- try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) {
- cloudSolrClient.connect();
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- ClusterState clusterState = zkStateReader.getClusterState();
+ CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zk);
+ ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ ClusterState clusterState = zkStateReader.getClusterState();
- final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
- Set<String> collections = clusterState.getCollectionsMap().keySet();
- for (String collection : collections) {
- builder.put(collection, new SolrTable(this, collection));
- }
+ Set<String> collections = clusterState.getCollectionsMap().keySet();
+ for (String collection : collections) {
+ builder.put(collection, new SolrTable(this, collection));
+ }
- Aliases aliases = zkStateReader.getAliases();
- for (String alias : aliases.getCollectionAliasListMap().keySet()) {
- // don't create duplicate entries
- if (!collections.contains(alias)) {
- builder.put(alias, new SolrTable(this, alias));
- }
+ Aliases aliases = zkStateReader.getAliases();
+ for (String alias : aliases.getCollectionAliasListMap().keySet()) {
+ // don't create duplicate entries
+ if (!collections.contains(alias)) {
+ builder.put(alias, new SolrTable(this, alias));
}
-
- return builder.build();
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+
+ return builder.build();
}
private Map<String, LukeResponse.FieldInfo> getFieldInfo(String collection) {
String zk = this.properties.getProperty("zk");
- try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) {
- cloudSolrClient.connect();
+ CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zk);
+ try {
LukeRequest lukeRequest = new LukeRequest();
lukeRequest.setNumTerms(0);
LukeResponse lukeResponse = lukeRequest.process(cloudSolrClient, collection);
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
index 46b09d2..f610bb5 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -160,7 +160,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
}
StreamContext streamContext = new StreamContext();
- streamContext.setSolrClientCache(StreamHandler.getClientCache());
+ streamContext.setSolrClientCache(schema.getSolrClientCache());
tupleStream.setStreamContext(streamContext);
final TupleStream finalStream = tupleStream;
diff --git a/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java b/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java
index 8df6817..713137c 100644
--- a/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java
+++ b/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java
@@ -259,8 +259,13 @@ public class SolrReporter extends ScheduledReporter {
* @return configured instance of reporter
*/
public SolrReporter build(HttpClient client, Supplier<String> urlProvider) {
- return new SolrReporter(client, urlProvider, metricManager, reports, handler, reporterId, rateUnit, durationUnit,
- params, skipHistograms, skipAggregateValues, cloudClient, compact);
+ return new SolrReporter(new SolrClientCache(client), urlProvider, metricManager, reports, handler, reporterId, rateUnit, durationUnit,
+ params, skipHistograms, skipAggregateValues, cloudClient, compact, true);
+ }
+
+ public SolrReporter build(SolrClientCache solrClientCache, Supplier<String> urlProvider) {
+ return new SolrReporter(solrClientCache, urlProvider, metricManager, reports, handler, reporterId, rateUnit, durationUnit,
+ params, skipHistograms, skipAggregateValues, cloudClient, compact, false);
}
}
@@ -269,6 +274,7 @@ public class SolrReporter extends ScheduledReporter {
private String handler;
private Supplier<String> urlProvider;
private SolrClientCache clientCache;
+ private boolean closeClientCache;
private List<CompiledReport> compiledReports;
private SolrMetricManager metricManager;
private boolean skipHistograms;
@@ -306,11 +312,11 @@ public class SolrReporter extends ScheduledReporter {
// We delegate to registries anyway, so having a dummy registry is harmless.
private static final MetricRegistry dummyRegistry = new MetricRegistry();
- public SolrReporter(HttpClient httpClient, Supplier<String> urlProvider, SolrMetricManager metricManager,
+ public SolrReporter(SolrClientCache solrClientCache, Supplier<String> urlProvider, SolrMetricManager metricManager,
List<Report> metrics, String handler,
String reporterId, TimeUnit rateUnit, TimeUnit durationUnit,
SolrParams params, boolean skipHistograms, boolean skipAggregateValues,
- boolean cloudClient, boolean compact) {
+ boolean cloudClient, boolean compact, boolean closeClientCache) {
super(dummyRegistry, "solr-reporter", MetricFilter.ALL, rateUnit, durationUnit, null, true);
this.metricManager = metricManager;
@@ -320,7 +326,8 @@ public class SolrReporter extends ScheduledReporter {
handler = MetricsCollectorHandler.HANDLER_PATH;
}
this.handler = handler;
- this.clientCache = new SolrClientCache(httpClient);
+ this.clientCache = solrClientCache;
+ this.closeClientCache = closeClientCache;
this.compiledReports = new ArrayList<>();
metrics.forEach(report -> {
MetricFilter filter = new SolrMetricManager.RegexFilter(report.metricFilters);
@@ -347,7 +354,9 @@ public class SolrReporter extends ScheduledReporter {
@Override
public void close() {
- clientCache.close();
+ if (closeClientCache) {
+ clientCache.close();
+ }
super.close();
}
diff --git a/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrShardReporter.java b/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrShardReporter.java
index 8609a23..7472af9 100644
--- a/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrShardReporter.java
+++ b/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrShardReporter.java
@@ -154,7 +154,7 @@ public class SolrShardReporter extends SolrCoreReporter {
.cloudClient(false) // we want to send reports specifically to a selected leader instance
.skipAggregateValues(true) // we don't want to transport details of aggregates
.skipHistograms(true) // we don't want to transport histograms
- .build(core.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient(), new LeaderUrlSupplier(core));
+ .build(core.getCoreContainer().getSolrClientCache(), new LeaderUrlSupplier(core));
reporter.start(period, TimeUnit.SECONDS);
}
diff --git a/solr/core/src/java/org/apache/solr/search/join/XCJFQuery.java b/solr/core/src/java/org/apache/solr/search/join/XCJFQuery.java
index f5c464b..32e7c51 100644
--- a/solr/core/src/java/org/apache/solr/search/join/XCJFQuery.java
+++ b/solr/core/src/java/org/apache/solr/search/join/XCJFQuery.java
@@ -261,7 +261,7 @@ public class XCJFQuery extends Query {
}
private DocSet getDocSet() throws IOException {
- SolrClientCache solrClientCache = new SolrClientCache();
+ SolrClientCache solrClientCache = searcher.getCore().getCoreContainer().getSolrClientCache();
TupleStream solrStream;
if (zkHost != null || solrUrl == null) {
solrStream = createCloudSolrStream(solrClientCache);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index fcefc2f..7aacd42 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -61,22 +61,36 @@ public class SolrClientCloudManager implements SolrCloudManager {
private final ZkStateReader zkStateReader;
private final SolrZkClient zkClient;
private final ObjectCache objectCache;
+ private final boolean closeObjectCache;
private volatile boolean isClosed;
public SolrClientCloudManager(DistributedQueueFactory queueFactory, CloudSolrClient solrClient) {
+ this(queueFactory, solrClient, null);
+ }
+
+ public SolrClientCloudManager(DistributedQueueFactory queueFactory, CloudSolrClient solrClient,
+ ObjectCache objectCache) {
this.queueFactory = queueFactory;
this.solrClient = solrClient;
this.zkStateReader = solrClient.getZkStateReader();
this.zkClient = zkStateReader.getZkClient();
this.stateManager = new ZkDistribStateManager(zkClient);
this.isClosed = false;
- this.objectCache = new ObjectCache();
+ if (objectCache == null) {
+ this.objectCache = new ObjectCache();
+ closeObjectCache = true;
+ } else {
+ this.objectCache = objectCache;
+ this.closeObjectCache = false;
+ }
}
@Override
public void close() {
isClosed = true;
- IOUtils.closeQuietly(objectCache);
+ if (closeObjectCache) {
+ IOUtils.closeQuietly(objectCache);
+ }
}
@Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/MapBackedCache.java b/solr/solrj/src/java/org/apache/solr/common/util/MapBackedCache.java
index 552dba6..3efdfe3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/MapBackedCache.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/MapBackedCache.java
@@ -30,6 +30,10 @@ public class MapBackedCache<K, V> implements Cache<K, V> {
this.map = map;
}
+ public Map<K, V> asMap() {
+ return map;
+ }
+
@Override
public V put(K key, V val) {
return map.put(key, val);