You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/12 14:34:43 UTC
[lucene-solr] branch reference_impl_dev updated: @531 Reach for the
sky!
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 21f49f0 @531 Reach for the sky!
21f49f0 is described below
commit 21f49f022266c09952bcff3eb8947c97855ead67
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Aug 12 09:34:21 2020 -0500
@531 Reach for the sky!
---
.../java/org/apache/solr/core/CoreContainer.java | 27 ++++--
.../org/apache/solr/update/UpdateShardHandler.java | 10 +-
.../solr/core/TestImplicitCoreProperties.java | 7 +-
.../src/java/org/apache/solr/common/ParWork.java | 10 +-
.../org/apache/solr/common/ParWorkExecService.java | 102 +++++----------------
.../org/apache/solr/common/ParWorkExecutor.java | 4 +-
.../apache/solr/common/util/ValidatingJsonMap.java | 18 ++--
.../apache/solr/common/util/JsonValidatorTest.java | 2 +-
8 files changed, 73 insertions(+), 107 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 883c148..282ac68 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -79,6 +79,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectCache;
@@ -186,6 +187,8 @@ public class CoreContainer implements Closeable {
protected volatile Properties containerProperties;
+ private final CloseTracker closeTracker;
+
private volatile ConfigSetService coreConfigService;
protected volatile ZkContainer zkSys = null;
@@ -324,7 +327,8 @@ public class CoreContainer implements Closeable {
this(null, config, locator, asyncSolrCoreLoad);
}
public CoreContainer(SolrZkClient zkClient, NodeConfig config, CoresLocator locator, boolean asyncSolrCoreLoad) {
- ObjectReleaseTracker.track(this);
+ assert ObjectReleaseTracker.track(this);
+ closeTracker = new CloseTracker();
this.containerProperties = new Properties(config.getSolrProperties());
String zkHost = System.getProperty("zkHost");
if (!StringUtils.isEmpty(zkHost)) {
@@ -596,6 +600,7 @@ public class CoreContainer implements Closeable {
* @lucene.experimental
*/
protected CoreContainer(Object testConstructor) {
+ closeTracker = new CloseTracker();
solrHome = null;
loader = null;
coresLocator = null;
@@ -1044,9 +1049,7 @@ public class CoreContainer implements Closeable {
@Override
public void close() throws IOException {
-// if (this.isShutDown) {
-// return;
-// }
+ closeTracker.close();
log.info("Closing CoreContainer");
// must do before isShutDown=true
if (isZooKeeperAware()) {
@@ -1182,13 +1185,17 @@ public class CoreContainer implements Closeable {
// we must cancel without holding the cores sync
// make sure we wait for any recoveries to stop
- for (SolrCore core : cores) {
- try {
- core.getSolrCoreState().cancelRecovery(true, true);
- } catch (Exception e) {
- SolrZkClient.checkInterrupted(e);
- SolrException.log(log, "Error canceling recovery for core", e);
+ try (ParWork work = new ParWork(this, true)) {
+ for (SolrCore core : cores) {
+ work.collect(() -> {
+ try {
+ core.getSolrCoreState().cancelRecovery(true, true);
+ } catch (Exception e) {
+ SolrException.log(log, "Error canceling recovery for core", e);
+ }
+ });
}
+ work.addCollect("cancelCoreRecoveries");
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index e354a46..14e3338 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -57,6 +58,8 @@ public class UpdateShardHandler implements SolrInfoBean {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final CloseTracker closeTracker;
+
private final Http2SolrClient updateOnlyClient;
private final CloseableHttpClient defaultClient;
@@ -77,7 +80,8 @@ public class UpdateShardHandler implements SolrInfoBean {
private int connectionTimeout = HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
- ObjectReleaseTracker.track(this);
+ assert ObjectReleaseTracker.track(this);
+ closeTracker = new CloseTracker();
defaultConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSocketFactoryRegistryProvider().getSocketFactoryRegistry());
ModifiableSolrParams clientParams = new ModifiableSolrParams();
if (cfg != null ) {
@@ -208,6 +212,7 @@ public class UpdateShardHandler implements SolrInfoBean {
}
public void close() {
+ closeTracker.close();
if (recoveryExecutor != null) {
recoveryExecutor.shutdownNow();
}
@@ -220,7 +225,6 @@ public class UpdateShardHandler implements SolrInfoBean {
});
closer.addCollect("recoveryExecutor");
-
closer.collect(updateOnlyClient);
closer.collect(defaultConnectionManager);
closer.collect(() -> {
@@ -229,7 +233,7 @@ public class UpdateShardHandler implements SolrInfoBean {
});
closer.addCollect("updateshardhandlerClients");
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
@VisibleForTesting
diff --git a/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java b/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java
index 07e8a13..24f71a3 100644
--- a/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java
+++ b/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java
@@ -34,9 +34,10 @@ public class TestImplicitCoreProperties extends SolrTestCaseJ4 {
@AfterClass
public static void teardownContainer() {
- if (cc != null) {
- cc.shutdown();
- }
+ // no, we don't own this
+// if (cc != null) {
+// cc.shutdown();
+// }
cc = null;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 118bd99..bc4abd2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -168,7 +169,7 @@ public class ParWork implements Closeable {
}
- private List<WorkUnit> workUnits = new CopyOnWriteArrayList();
+ private List<WorkUnit> workUnits = Collections.synchronizedList(new ArrayList<>());
private final TimeTracker tracker;
@@ -278,8 +279,11 @@ public class ParWork implements Closeable {
log.info("No work collected to submit");
return;
}
- add(label, collectSet);
- collectSet.clear();
+ try {
+ add(label, collectSet);
+ } finally {
+ collectSet.clear();
+ }
}
// add a unit of work
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 83744c8..04624c1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -95,49 +95,19 @@ public class ParWorkExecService implements ExecutorService {
return CompletableFuture.completedFuture(callable.call());
}
} else {
- return service.submit(callable);
+ return service.submit(new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ try {
+ return callable.call();
+ } finally {
+ available.release();
+ }
+ }
+ });
}
Future<T> future = service.submit(callable);
- return new Future<T>() {
- @Override
- public boolean cancel(boolean b) {
- return future.cancel(b);
- }
-
- @Override
- public boolean isCancelled() {
- return future.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return future.isDone();
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- T ret;
- try {
- ret = future.get();
- } finally {
- available.release();
- }
-
- return ret;
- }
-
- @Override
- public T get(long l, TimeUnit timeUnit)
- throws InterruptedException, ExecutionException, TimeoutException {
- T ret;
- try {
- ret = future.get(l, timeUnit);
- } finally {
- available.release();
- }
- return ret;
- }
- };
+ return future;
} catch (Exception e) {
ParWork.propegateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -186,49 +156,18 @@ public class ParWorkExecService implements ExecutorService {
} else {
return service.submit(runnable);
}
- Future<?> future = service.submit(runnable);
-
- return new Future<>() {
- @Override
- public boolean cancel(boolean b) {
- return future.cancel(b);
- }
-
- @Override
- public boolean isCancelled() {
- return future.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return future.isDone();
- }
-
+ Future<?> future = service.submit(new Runnable() {
@Override
- public Object get() throws InterruptedException, ExecutionException {
- Object ret;
+ public void run() {
try {
- ret = future.get();
+ runnable.run();
} finally {
available.release();
}
-
- return ret;
}
+ });
- @Override
- public Object get(long l, TimeUnit timeUnit)
- throws InterruptedException, ExecutionException, TimeoutException {
- Object ret;
- try {
- ret = future.get(l, timeUnit);
- } finally {
- available.release();
- }
- return ret;
- }
-
- };
+ return future;
} catch (Exception e) {
ParWork.propegateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -244,15 +183,20 @@ public class ParWorkExecService implements ExecutorService {
for (Callable c : collection) {
futures.add(submit(c));
}
-
+ Exception exception = null;
for (Future<T> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
log.error("invokeAll execution exception", e);
- //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
}
}
+ if (exception != null) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, exception);
return futures;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 6f8ffdb..0cbccd8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -44,8 +44,8 @@ public class ParWorkExecutor extends ExecutorUtil.MDCAwareThreadPoolExecutor {
@Override
public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- name + threadNumber.getAndIncrement(), 0) {
+ Thread t = new Thread(group,
+ name + threadNumber.getAndIncrement()) {
public void run() {
try {
r.run();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java b/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
index 2e113d0..ddcd731 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
@@ -285,12 +285,18 @@ public class ValidatingJsonMap implements Map<String, Object>, NavigableObject {
map.putAll(includedMap);
}
if (maxDepth > 0) {
- ParWork.getExecutor().submit(() -> {
- map.entrySet().parallelStream()
- .filter(e -> e.getValue() instanceof Map)
- .map(Map.Entry::getValue)
- .forEach(m -> handleIncludes((ValidatingJsonMap) m, loc, maxDepth - 1));
- });
+ Set<Entry<String,Object>> entrySet = map.entrySet();
+ try (ParWork work = new ParWork("includes")) {
+ for (Entry<String,Object> entry : entrySet) {
+ Object v = entry.getValue();
+ if (v instanceof Map) {
+ work.collect(() -> {
+ handleIncludes((ValidatingJsonMap) v, loc, maxDepth - 1);
+ });
+ }
+ }
+ work.addCollect("includes");
+ }
}
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java b/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java
index 8c06266..42f1ad5 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java
@@ -59,7 +59,7 @@ public class JsonValidatorTest extends SolrTestCaseJ4 {
assertNotNull(toJSONString(errs), errs);
assertTrue(toJSONString(errs), errs.get(0).contains("expected"));
errs = validator.validateJson(Utils.fromJSONString("{x:y, collections: [ c1 , c2]}"));
- assertTrue(toJSONString(errs), StrUtils.join(errs, '|').contains("Unknown"));
+ assertTrue(toJSONString(errs), StrUtils.join(errs, '|').contains("Missing required attribute"));
errs = validator.validateJson(Utils.fromJSONString("{name : x, collections: [ 1 , 2]}"));
assertFalse(toJSONString(errs), errs.isEmpty());
assertTrue(toJSONString(errs), errs.get(0).contains("expected"));