You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/17 15:22:44 UTC
[lucene-solr] 10/18: @845 Non fair locks,
too many max connections per host for http2 client.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit d3561af65783cdf8dd331d3a6b0df3f6bf33df5c
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 16 21:45:06 2020 -0500
@845 Non fair locks, too many max connections per host for http2 client.
---
.../client/solrj/embedded/JettySolrRunner.java | 1 +
.../src/java/org/apache/solr/core/PluginBag.java | 10 +-
.../org/apache/solr/update/CdcrTransactionLog.java | 15 ++-
.../java/org/apache/solr/update/CommitTracker.java | 43 ++++++---
.../org/apache/solr/update/HdfsTransactionLog.java | 5 +-
.../org/apache/solr/update/TransactionLog.java | 103 ++++++++++++++++-----
.../org/apache/solr/update/UpdateShardHandler.java | 3 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 2 +-
.../apache/solr/common/cloud/ZkStateReader.java | 23 +++--
9 files changed, 149 insertions(+), 56 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 0679ff5..68285e8 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -341,6 +341,7 @@ public class JettySolrRunner implements Closeable {
HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
http2ConnectionFactory.setMaxConcurrentStreams(256);
+
http2ConnectionFactory.setInputBufferSize(16384);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 5cb2c540..4f04b9c 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -86,7 +86,7 @@ public class PluginBag<T> implements AutoCloseable {
// TODO: since reads will dominate writes, we could also think about creating a new instance of a map each time it changes.
// Not sure how much benefit this would have over ConcurrentHashMap though
// We could also perhaps make this constructor into a factory method to return different implementations depending on thread safety needs.
- this.registry = new ConcurrentHashMap<>(64, 0.75f, 4);
+ this.registry = new ConcurrentHashMap<>(128, 0.75f, 3);
this.immutableRegistry = Collections.unmodifiableMap(registry);
meta = SolrConfig.classVsSolrPluginInfo.get(klass.getName());
if (meta == null) {
@@ -364,10 +364,10 @@ public class PluginBag<T> implements AutoCloseable {
@Override
public void close() {
try (ParWork worker = new ParWork(this)) {
- for (Map.Entry<String,PluginHolder<T>> e : registry.entrySet()) {
- worker.collect(e.getValue());
- }
- worker.addCollect();
+ registry.forEach((s, tPluginHolder) -> {
+ worker.collect(tPluginHolder);
+ });
+
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
index a41d7eb..416e459 100644
--- a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
@@ -202,13 +202,16 @@ public class CdcrTransactionLog extends TransactionLog {
codec.writePrimitive(false);
}
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
+ } finally {
+ fosLock.unlock();
}
} catch (IOException e) {
@@ -236,12 +239,15 @@ public class CdcrTransactionLog extends TransactionLog {
} else {
codec.writePrimitive(false);
}
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
+ } finally {
+ fosLock.unlock();
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -251,7 +257,8 @@ public class CdcrTransactionLog extends TransactionLog {
@Override
public long writeCommit(CommitUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
@@ -278,6 +285,8 @@ public class CdcrTransactionLog extends TransactionLog {
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
+ } finally {
+ fosLock.unlock();
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 1db1f74..48664e4 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Helper class for tracking autoCommit state.
@@ -62,6 +63,8 @@ public final class CommitTracker implements Runnable, Closeable {
private long timeUpperBound;
private long tLogFileSizeUpperBound;
+ private ReentrantLock lock = new ReentrantLock(true);
+
// note: can't use ExecutorsUtil because it doesn't have a *scheduled* ExecutorService.
// Not a big deal but it means we must take care of MDC logging here.
private final ScheduledThreadPoolExecutor scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory("commitScheduler"));
@@ -105,15 +108,21 @@ public final class CommitTracker implements Runnable, Closeable {
return openSearcher;
}
- public synchronized void close() {
- this.closed = true;
+ public void close() {
+
+ lock.lock();
try {
- pending.cancel(true);
- } catch (NullPointerException e) {
- // okay
+ this.closed = true;
+ try {
+ pending.cancel(true);
+ } catch (NullPointerException e) {
+ // okay
+ }
+ pending = null;
+ ParWork.close(scheduler);
+ } finally {
+ lock.unlock();
}
- pending = null;
- ParWork.close(scheduler);
assert ObjectReleaseTracker.release(this);
}
@@ -123,13 +132,16 @@ public final class CommitTracker implements Runnable, Closeable {
}
public void cancelPendingCommit() {
- synchronized (this) {
+ lock.lock();
+ try {
if (pending != null) {
boolean canceled = pending.cancel(false);
if (canceled) {
pending = null;
}
}
+ } finally {
+ lock.unlock();
}
}
@@ -143,7 +155,8 @@ public final class CommitTracker implements Runnable, Closeable {
private void _scheduleCommitWithin(long commitMaxTime) {
if (commitMaxTime <= 0) return;
- synchronized (this) {
+ lock.lock();
+ try {
if (pending != null && pending.getDelay(TimeUnit.MILLISECONDS) <= commitMaxTime) {
// There is already a pending commit that will happen first, so
// nothing else to do here.
@@ -172,6 +185,8 @@ public final class CommitTracker implements Runnable, Closeable {
if (!closed) {
pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
}
+ } finally {
+ lock.unlock();
}
}
@@ -248,21 +263,27 @@ public final class CommitTracker implements Runnable, Closeable {
/** Inform tracker that a rollback has occurred, cancel any pending commits */
public void didRollback() {
- synchronized (this) {
+ lock.lock();
+ try {
if (pending != null) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit.set(0);
+ } finally {
+ lock.unlock();
}
}
/** This is the worker part for the ScheduledFuture **/
@Override
public void run() {
- synchronized (this) {
+ lock.lock();
+ try {
// log.info("###start commit. pending=null");
pending = null; // allow a new commit to be scheduled
+ } finally {
+ lock.unlock();
}
MDCLoggingContext.setCore(core);
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index cae45f6..48fef21 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -265,7 +265,8 @@ public class HdfsTransactionLog extends TransactionLog {
}
private void doCloseOutput() throws IOException {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
if (fos == null) return;
if (debug) {
log.debug("Closing output for {}", tlogFile);
@@ -273,6 +274,8 @@ public class HdfsTransactionLog extends TransactionLog {
fos.flushBuffer();
finalLogSize = fos.size();
fos = null;
+ } finally {
+ fosLock.unlock();
}
tlogOutStream.hflush();
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index a7f5acb..ed69d85 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.FastOutputStream;
@@ -77,7 +78,7 @@ public class TransactionLog implements Closeable {
FileChannel channel;
OutputStream os;
FastOutputStream fos; // all accesses to this stream should be synchronized on "this" (The TransactionLog)
- final Object fosLock = new Object();
+ final ReentrantLock fosLock = new ReentrantLock(true);
volatile int numRecords;
boolean isBuffer;
@@ -239,9 +240,12 @@ public class TransactionLog implements Closeable {
public boolean endsWithCommit() throws IOException {
long size;
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
fos.flush();
size = fos.size();
+ } finally {
+ fosLock.unlock();
}
// the end of the file should have the end message (added during a commit) plus a 4 byte size
@@ -326,12 +330,15 @@ public class TransactionLog implements Closeable {
// rollback() is the only function that can reset to zero, and it blocks updates.
if (fos.size() != 0) return;
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
if (fos.size() != 0) return; // check again while synchronized
if (optional != null) {
addGlobalStrings(optional.getFieldNames());
}
writeLogHeader(codec);
+ } finally {
+ fosLock.unlock();
}
}
@@ -390,7 +397,8 @@ public class TransactionLog implements Closeable {
}
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
lastAddSize = (int) out.size();
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
@@ -407,6 +415,8 @@ public class TransactionLog implements Closeable {
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
+ } finally {
+ fosLock.unlock();
}
} catch (IOException e) {
@@ -430,13 +440,16 @@ public class TransactionLog implements Closeable {
codec.writeLong(cmd.getVersion());
codec.writeByteArray(br.bytes, br.offset, br.length);
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
+ } finally {
+ fosLock.unlock();
}
} catch (IOException e) {
@@ -457,12 +470,15 @@ public class TransactionLog implements Closeable {
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
+ } finally {
+ fosLock.unlock();
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -473,7 +489,8 @@ public class TransactionLog implements Closeable {
public long writeCommit(CommitUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
@@ -496,6 +513,8 @@ public class TransactionLog implements Closeable {
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
+ } finally {
+ fosLock.unlock();
}
}
@@ -509,7 +528,8 @@ public class TransactionLog implements Closeable {
try {
// make sure any unflushed buffer has been flushed
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
// TODO: optimize this by keeping track of what we have flushed up to
fos.flushBuffer();
/***
@@ -518,6 +538,8 @@ public class TransactionLog implements Closeable {
throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
}
***/
+ } finally {
+ fosLock.unlock();
}
ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
@@ -548,8 +570,11 @@ public class TransactionLog implements Closeable {
/** returns the current position in the log file */
public long position() {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
return fos.size();
+ } finally {
+ fosLock.unlock();
}
}
@@ -561,8 +586,11 @@ public class TransactionLog implements Closeable {
public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
fos.flushBuffer();
+ } finally {
+ fosLock.unlock();
}
if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
@@ -583,9 +611,12 @@ public class TransactionLog implements Closeable {
log.debug("Closing tlog {}", this);
}
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
fos.flush();
fos.close();
+ } finally {
+ fosLock.unlock();
}
if (deleteOnClose) {
@@ -665,7 +696,8 @@ public class TransactionLog implements Closeable {
*/
public Object next() throws IOException, InterruptedException {
long pos;
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
pos = fis.position();
if (trace) {
log.trace("Reading log record. pos={} currentSize={}", pos, fos.size());
@@ -696,6 +728,8 @@ public class TransactionLog implements Closeable {
assert size == fis.position() - pos - 4;
return o;
+ } finally {
+ fosLock.unlock();
}
}
@@ -737,7 +771,8 @@ public class TransactionLog implements Closeable {
@Override
public Object next() throws IOException, InterruptedException {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
if (versionToPos == null) {
versionToPos = new TreeMap<>();
Object o;
@@ -767,6 +802,8 @@ public class TransactionLog implements Closeable {
if (pos != currentPos()) fis.seek(pos);
return super.next();
}
+ } finally {
+ fosLock.unlock();
}
}
}
@@ -808,7 +845,8 @@ public class TransactionLog implements Closeable {
incref();
long sz;
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
fos.flushBuffer();
sz = fos.size();
assert sz == channel.size();
@@ -820,6 +858,8 @@ public class TransactionLog implements Closeable {
fis.seek(prevPos);
nextLength = fis.readInt();
}
+ } finally {
+ fosLock.unlock();
}
}
@@ -830,7 +870,8 @@ public class TransactionLog implements Closeable {
*/
public Object next() throws IOException {
Object o = null;
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
if (prevPos <= 0) return null;
long endOfThisRecord = prevPos;
@@ -862,6 +903,8 @@ public class TransactionLog implements Closeable {
o = codec.readVal(fis);
// assert fis.position() == prevPos + 4 + thisLength; // this is only true if we read all the data (and we currently skip reading SolrInputDocument
+ } finally {
+ fosLock.unlock();
}
return o;
}
@@ -877,8 +920,11 @@ public class TransactionLog implements Closeable {
@Override
public String toString() {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+ } finally {
+ fosLock.unlock();
}
}
@@ -898,14 +944,18 @@ public class TransactionLog implements Closeable {
@Override
public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
int ret = ch.read(bb, readFromStream);
return ret;
+ } finally {
+ fosLock.unlock();
}
}
public void seek(long position) throws IOException {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
if (position <= readFromStream && position >= getBufferPos()) {
// seek within buffer
pos = (int) (position - getBufferPos());
@@ -916,26 +966,37 @@ public class TransactionLog implements Closeable {
end = pos = 0;
}
assert position() == position;
+ } finally {
+ fosLock.unlock();
}
}
/** where is the start of the buffer relative to the whole file */
public long getBufferPos() {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
return readFromStream - end;
+ } finally {
+ fosLock.unlock();
}
}
public int getBufferSize() {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
return buf.length;
+ } finally {
+ fosLock.unlock();
}
}
@Override
public void close() throws IOException {
- synchronized (fosLock) {
+ fosLock.lock();
+ try {
ch.close();
+ } finally {
+ fosLock.unlock();
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 089c719..29906b1 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -104,8 +104,7 @@ public class UpdateShardHandler implements SolrInfoBean {
if (cfg != null) {
updateOnlyClientBuilder
.connectionTimeout(cfg.getDistributedConnectionTimeout())
- .idleTimeout(cfg.getDistributedSocketTimeout())
- .maxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
+ .idleTimeout(cfg.getDistributedSocketTimeout());
}
updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().build();
updateOnlyClient.enableCloseLock();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 38959f7..ef0ca18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -933,7 +933,7 @@ public class Http2SolrClient extends SolrClient {
private SSLConfig sslConfig = defaultSSLConfig;
private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 30000);
private Integer connectionTimeout;
- private Integer maxConnectionsPerHost;
+ private Integer maxConnectionsPerHost = 4;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
protected Map<String,String> headers = new ConcurrentHashMap<>();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 20e5c38..d534f5e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -199,10 +199,10 @@ public class ZkStateReader implements SolrCloseable {
private final Runnable securityNodeListener;
- private final ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(16, 0.75f, 5);
+ private final ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(32, 0.75f, 3);
// named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
- private final ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(16, 0.75f, 5);
+ private final ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(32, 0.75f, 3);
private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
@@ -1677,16 +1677,15 @@ public class ZkStateReader implements SolrCloseable {
*/
public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
- synchronized (collectionWatches) {
- collectionWatches.compute(collection, (k, v) -> {
- if (v == null) {
- v = new CollectionWatch<>();
- watchSet.set(true);
- }
- v.stateWatchers.add(stateWatcher);
- return v;
- });
- }
+
+ collectionWatches.compute(collection, (k, v) -> {
+ if (v == null) {
+ v = new CollectionWatch<>();
+ watchSet.set(true);
+ }
+ v.stateWatchers.add(stateWatcher);
+ return v;
+ });
if (watchSet.get()) {
new StateWatcher(collection).refreshAndWatch();