You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/12 14:34:43 UTC

[lucene-solr] branch reference_impl_dev updated: @531 Reach for the sky!

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 21f49f0  @531 Reach for the sky!
21f49f0 is described below

commit 21f49f022266c09952bcff3eb8947c97855ead67
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Aug 12 09:34:21 2020 -0500

    @531 Reach for the sky!
---
 .../java/org/apache/solr/core/CoreContainer.java   |  27 ++++--
 .../org/apache/solr/update/UpdateShardHandler.java |  10 +-
 .../solr/core/TestImplicitCoreProperties.java      |   7 +-
 .../src/java/org/apache/solr/common/ParWork.java   |  10 +-
 .../org/apache/solr/common/ParWorkExecService.java | 102 +++++----------------
 .../org/apache/solr/common/ParWorkExecutor.java    |   4 +-
 .../apache/solr/common/util/ValidatingJsonMap.java |  18 ++--
 .../apache/solr/common/util/JsonValidatorTest.java |   2 +-
 8 files changed, 73 insertions(+), 107 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 883c148..282ac68 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -79,6 +79,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.CloseTracker;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectCache;
@@ -186,6 +187,8 @@ public class CoreContainer implements Closeable {
 
   protected volatile Properties containerProperties;
 
+  private final CloseTracker closeTracker;
+
   private volatile ConfigSetService coreConfigService;
 
   protected volatile ZkContainer zkSys = null;
@@ -324,7 +327,8 @@ public class CoreContainer implements Closeable {
     this(null, config, locator, asyncSolrCoreLoad);
   }
   public CoreContainer(SolrZkClient zkClient, NodeConfig config, CoresLocator locator, boolean asyncSolrCoreLoad) {
-    ObjectReleaseTracker.track(this);
+    assert ObjectReleaseTracker.track(this);
+    closeTracker = new CloseTracker();
     this.containerProperties = new Properties(config.getSolrProperties());
     String zkHost = System.getProperty("zkHost");
     if (!StringUtils.isEmpty(zkHost)) {
@@ -596,6 +600,7 @@ public class CoreContainer implements Closeable {
    * @lucene.experimental
    */
   protected CoreContainer(Object testConstructor) {
+    closeTracker = new CloseTracker();
     solrHome = null;
     loader = null;
     coresLocator = null;
@@ -1044,9 +1049,7 @@ public class CoreContainer implements Closeable {
 
   @Override
   public void close() throws IOException {
-//    if (this.isShutDown) {
-//      return;
-//    }
+    closeTracker.close();
     log.info("Closing CoreContainer");
     // must do before isShutDown=true
     if (isZooKeeperAware()) {
@@ -1182,13 +1185,17 @@ public class CoreContainer implements Closeable {
 
     // we must cancel without holding the cores sync
     // make sure we wait for any recoveries to stop
-    for (SolrCore core : cores) {
-      try {
-        core.getSolrCoreState().cancelRecovery(true, true);
-      } catch (Exception e) {
-        SolrZkClient.checkInterrupted(e);
-        SolrException.log(log, "Error canceling recovery for core", e);
+    try (ParWork work = new ParWork(this, true)) {
+      for (SolrCore core : cores) {
+        work.collect(() -> {
+          try {
+            core.getSolrCoreState().cancelRecovery(true, true);
+          } catch (Exception e) {
+            SolrException.log(log, "Error canceling recovery for core", e);
+          }
+        });
       }
+      work.addCollect("cancelCoreRecoveries");
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index e354a46..14e3338 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.CloseTracker;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -57,6 +58,8 @@ public class UpdateShardHandler implements SolrInfoBean {
   
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private final CloseTracker closeTracker;
+
   private final Http2SolrClient updateOnlyClient;
 
   private final CloseableHttpClient defaultClient;
@@ -77,7 +80,8 @@ public class UpdateShardHandler implements SolrInfoBean {
   private int connectionTimeout = HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
 
   public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
-    ObjectReleaseTracker.track(this);
+    assert ObjectReleaseTracker.track(this);
+    closeTracker = new CloseTracker();
     defaultConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSocketFactoryRegistryProvider().getSocketFactoryRegistry());
     ModifiableSolrParams clientParams = new ModifiableSolrParams();
     if (cfg != null ) {
@@ -208,6 +212,7 @@ public class UpdateShardHandler implements SolrInfoBean {
   }
 
   public void close() {
+    closeTracker.close();
     if (recoveryExecutor != null) {
       recoveryExecutor.shutdownNow();
     }
@@ -220,7 +225,6 @@ public class UpdateShardHandler implements SolrInfoBean {
       });
       closer.addCollect("recoveryExecutor");
 
-
       closer.collect(updateOnlyClient);
       closer.collect(defaultConnectionManager);
       closer.collect(() -> {
@@ -229,7 +233,7 @@ public class UpdateShardHandler implements SolrInfoBean {
       });
       closer.addCollect("updateshardhandlerClients");
     }
-    ObjectReleaseTracker.release(this);
+    assert ObjectReleaseTracker.release(this);
   }
 
   @VisibleForTesting
diff --git a/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java b/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java
index 07e8a13..24f71a3 100644
--- a/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java
+++ b/solr/core/src/test/org/apache/solr/core/TestImplicitCoreProperties.java
@@ -34,9 +34,10 @@ public class TestImplicitCoreProperties extends SolrTestCaseJ4 {
 
   @AfterClass
   public static void teardownContainer() {
-    if (cc != null) {
-      cc.shutdown();
-    }
+  // no, we don't own this
+//    if (cc != null) {
+//      cc.shutdown();
+//    }
     cc = null;
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 118bd99..bc4abd2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -168,7 +169,7 @@ public class ParWork implements Closeable {
 
   }
 
-  private List<WorkUnit> workUnits = new CopyOnWriteArrayList();
+  private List<WorkUnit> workUnits = Collections.synchronizedList(new ArrayList<>());
 
   private final TimeTracker tracker;
 
@@ -278,8 +279,11 @@ public class ParWork implements Closeable {
       log.info("No work collected to submit");
       return;
     }
-    add(label, collectSet);
-    collectSet.clear();
+    try {
+      add(label, collectSet);
+    } finally {
+      collectSet.clear();
+    }
   }
 
   // add a unit of work
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 83744c8..04624c1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -95,49 +95,19 @@ public class ParWorkExecService implements ExecutorService {
           return CompletableFuture.completedFuture(callable.call());
         }
       } else {
-        return service.submit(callable);
+        return service.submit(new Callable<T>() {
+          @Override
+          public T call() throws Exception {
+            try {
+              return callable.call();
+            } finally {
+              available.release();
+            }
+          }
+        });
       }
       Future<T> future = service.submit(callable);
-      return new Future<T>() {
-        @Override
-        public boolean cancel(boolean b) {
-          return future.cancel(b);
-        }
-
-        @Override
-        public boolean isCancelled() {
-          return future.isCancelled();
-        }
-
-        @Override
-        public boolean isDone() {
-          return future.isDone();
-        }
-
-        @Override
-        public T get() throws InterruptedException, ExecutionException {
-          T ret;
-          try {
-            ret = future.get();
-          } finally {
-            available.release();
-          }
-
-          return ret;
-        }
-
-        @Override
-        public T get(long l, TimeUnit timeUnit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-          T ret;
-          try {
-            ret = future.get(l, timeUnit);
-          } finally {
-            available.release();
-          }
-          return ret;
-        }
-      };
+      return future;
     } catch (Exception e) {
       ParWork.propegateInterrupt(e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -186,49 +156,18 @@ public class ParWorkExecService implements ExecutorService {
       } else {
         return service.submit(runnable);
       }
-      Future<?> future = service.submit(runnable);
-
-      return new Future<>() {
-        @Override
-        public boolean cancel(boolean b) {
-          return future.cancel(b);
-        }
-
-        @Override
-        public boolean isCancelled() {
-          return future.isCancelled();
-        }
-
-        @Override
-        public boolean isDone() {
-          return future.isDone();
-        }
-
+      Future<?> future = service.submit(new Runnable() {
         @Override
-        public Object get() throws InterruptedException, ExecutionException {
-          Object ret;
+        public void run() {
           try {
-            ret = future.get();
+            runnable.run();
           } finally {
             available.release();
           }
-
-          return ret;
         }
+      });
 
-        @Override
-        public Object get(long l, TimeUnit timeUnit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-          Object ret;
-          try {
-            ret = future.get(l, timeUnit);
-          } finally {
-            available.release();
-          }
-          return ret;
-        }
-
-      };
+      return future;
     } catch (Exception e) {
       ParWork.propegateInterrupt(e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -244,15 +183,20 @@ public class ParWorkExecService implements ExecutorService {
     for (Callable c : collection) {
       futures.add(submit(c));
     }
-
+    Exception exception = null;
     for (Future<T> future : futures) {
       try {
         future.get();
       } catch (ExecutionException e) {
         log.error("invokeAll execution exception", e);
-        //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        if (exception == null) {
+          exception = e;
+        } else {
+          exception.addSuppressed(e);
+        }
       }
     }
+    if (exception != null) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, exception);
     return futures;
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 6f8ffdb..0cbccd8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -44,8 +44,8 @@ public class ParWorkExecutor extends ExecutorUtil.MDCAwareThreadPoolExecutor {
 
           @Override
           public Thread newThread(Runnable r) {
-            Thread t = new Thread(group, r,
-                name + threadNumber.getAndIncrement(), 0) {
+            Thread t = new Thread(group,
+                name + threadNumber.getAndIncrement()) {
               public void run() {
                 try {
                   r.run();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java b/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
index 2e113d0..ddcd731 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
@@ -285,12 +285,18 @@ public class ValidatingJsonMap implements Map<String, Object>, NavigableObject {
       map.putAll(includedMap);
     }
     if (maxDepth > 0) {
-      ParWork.getExecutor().submit(() -> {
-        map.entrySet().parallelStream()
-            .filter(e -> e.getValue() instanceof Map)
-            .map(Map.Entry::getValue)
-            .forEach(m -> handleIncludes((ValidatingJsonMap) m, loc, maxDepth - 1));
-      });
+      Set<Entry<String,Object>> entrySet = map.entrySet();
+      try (ParWork work = new ParWork("includes")) {
+        for (Entry<String,Object> entry : entrySet) {
+          Object v = entry.getValue();
+          if (v instanceof  Map) {
+            work.collect(() -> {
+              handleIncludes((ValidatingJsonMap) v, loc, maxDepth - 1);
+            });
+          }
+        }
+        work.addCollect("includes");
+      }
     }
   }
 
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java b/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java
index 8c06266..42f1ad5 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/JsonValidatorTest.java
@@ -59,7 +59,7 @@ public class JsonValidatorTest extends SolrTestCaseJ4  {
     assertNotNull(toJSONString(errs), errs);
     assertTrue(toJSONString(errs), errs.get(0).contains("expected"));
     errs = validator.validateJson(Utils.fromJSONString("{x:y, collections: [ c1 , c2]}"));
-    assertTrue(toJSONString(errs), StrUtils.join(errs, '|').contains("Unknown"));
+    assertTrue(toJSONString(errs), StrUtils.join(errs, '|').contains("Missing required attribute"));
     errs = validator.validateJson(Utils.fromJSONString("{name : x, collections: [ 1 , 2]}"));
     assertFalse(toJSONString(errs), errs.isEmpty());
     assertTrue(toJSONString(errs), errs.get(0).contains("expected"));