You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2016/04/14 20:43:41 UTC

lucene-solr:branch_5_5: SOLR-8908: Fixed OnReconnect listener management in ZkController to allow for de-registering listeners.

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_5_5 4e7b578d2 -> 14ec48a59


SOLR-8908: Fixed OnReconnect listener management in ZkController to allow for de-registering listeners.

Here's what this commit includes:
* Added the removeOnReconnectListener method to ZkController to allow OnReconnect listener implementations to de-register; avoids a memory leak
* Updated ZkIndexSchemaReader to add a CloseHook to the SolrCore it supports to de-register as an OnReconnect listener
* Added unit test to verify that after reloading and deleting a SolrCore in managed schema mode, the associated ZkIndexSchemaReader gets de-registered correctly


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/14ec48a5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/14ec48a5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/14ec48a5

Branch: refs/heads/branch_5_5
Commit: 14ec48a59412e6822f74716b7307c0872c148431
Parents: 4e7b578
Author: Timothy Potter <th...@gmail.com>
Authored: Thu Apr 14 10:17:42 2016 -0700
Committer: Timothy Potter <th...@gmail.com>
Committed: Thu Apr 14 11:15:06 2016 -0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/cloud/ZkController.java     |  66 +++++++-
 .../solr/schema/ManagedIndexSchemaFactory.java  |   4 +-
 .../apache/solr/schema/ZkIndexSchemaReader.java |  56 ++++++-
 .../apache/solr/cloud/CollectionReloadTest.java |  85 +---------
 .../solr/cloud/DistributedVersionInfoTest.java  |  16 --
 .../apache/solr/cloud/HttpPartitionTest.java    |  16 --
 .../cloud/TestOnReconnectListenerSupport.java   | 156 +++++++++++++++++++
 .../apache/solr/common/cloud/OnReconnect.java   |   9 +-
 .../cloud/AbstractFullDistribZkTestBase.java    |  75 +++++++++
 10 files changed, 360 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 57ac048..286a2da 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -48,6 +48,8 @@ Bug Fixes
 * SOLR-8835: JSON Facet API: fix faceting exception on multi-valued numeric fields that
   have docValues. (yonik)
 
+* SOLR-8908: Fix to OnReconnect listener registration to allow listeners to deregister, such
+  as when a core is reloaded or deleted to avoid a memory leak. (Timothy Potter)
 
 ======================= 5.5.0 =======================
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e1d4e83..a5da683 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -184,7 +184,8 @@ public final class ZkController {
   private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert
 
   // keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
-  private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
+  // ref is held as a HashSet since we clone the set before notifying to avoid synchronizing too long
+  private HashSet<OnReconnect> reconnectListeners = new HashSet<OnReconnect>();
 
   private class RegisterCoreAsync implements Callable {
 
@@ -205,6 +206,22 @@ public final class ZkController {
     }
   }
 
+  // notifies registered listeners after the ZK reconnect in the background
+  private class OnReconnectNotifyAsync implements Callable {
+
+    private final OnReconnect listener;
+
+    OnReconnectNotifyAsync(OnReconnect listener) {
+      this.listener = listener;
+    }
+
+    @Override
+    public Object call() throws Exception {
+      listener.command();
+      return null;
+    }
+  }
+
   public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
       throws InterruptedException, TimeoutException, IOException {
 
@@ -291,8 +308,8 @@ public final class ZkController {
 
               List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
               // re register all descriptors
+              ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
               if (descriptors != null) {
-                ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
                 for (CoreDescriptor descriptor : descriptors) {
                   // TODO: we need to think carefully about what happens when it
                   // was
@@ -315,17 +332,23 @@ public final class ZkController {
               }
 
               // notify any other objects that need to know when the session was re-connected
+              HashSet<OnReconnect> clonedListeners;
               synchronized (reconnectListeners) {
-                for (OnReconnect listener : reconnectListeners) {
-                  try {
+                clonedListeners = (HashSet<OnReconnect>)reconnectListeners.clone();
+              }
+              // the OnReconnect operation can be expensive per listener, so do that async in the background
+              for (OnReconnect listener : clonedListeners) {
+                try {
+                  if (executorService != null) {
+                    executorService.submit(new OnReconnectNotifyAsync(listener));
+                  } else {
                     listener.command();
-                  } catch (Exception exc) {
-                    // not much we can do here other than warn in the log
-                    log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc);
                   }
+                } catch (Exception exc) {
+                  // not much we can do here other than warn in the log
+                  log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc);
                 }
               }
-
             } catch (InterruptedException e) {
               // Restore the interrupted status
               Thread.currentThread().interrupt();
@@ -2224,11 +2247,38 @@ public final class ZkController {
     if (listener != null) {
       synchronized (reconnectListeners) {
         reconnectListeners.add(listener);
+        log.info("Added new OnReconnect listener "+listener);
       }
     }
   }
 
   /**
+   * Removed a previously registered OnReconnect listener, such as when a core is removed or reloaded.
+   */
+  public void removeOnReconnectListener(OnReconnect listener) {
+    if (listener != null) {
+      boolean wasRemoved;
+      synchronized (reconnectListeners) {
+        wasRemoved = reconnectListeners.remove(listener);
+      }
+      if (wasRemoved) {
+        log.info("Removed OnReconnect listener "+listener);
+      } else {
+        log.warn("Was asked to remove OnReconnect listener "+listener+
+            ", but remove operation did not find it in the list of registered listeners.");
+      }
+    }
+  }
+
+  Set<OnReconnect> getCurrentOnReconnectListeners() {
+    HashSet<OnReconnect> clonedListeners;
+    synchronized (reconnectListeners) {
+      clonedListeners = (HashSet<OnReconnect>)reconnectListeners.clone();
+    }
+    return clonedListeners;
+  }
+
+  /**
    * Persists a config file to ZooKeeper using optimistic concurrency.
    *
    * @return true on success

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
index 8ea007f..60a7750 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
@@ -30,6 +30,8 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrResourceLoader;
@@ -373,7 +375,7 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
   public void inform(SolrCore core) {
     this.core = core;
     if (loader instanceof ZkSolrResourceLoader) {
-      this.zkIndexSchemaReader = new ZkIndexSchemaReader(this);
+      this.zkIndexSchemaReader = new ZkIndexSchemaReader(this, core);
       ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
       zkLoader.setZkIndexSchemaReader(this.zkIndexSchemaReader);
     } else {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index db1bad8..25cf158 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -20,6 +20,9 @@ import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -38,13 +41,34 @@ public class ZkIndexSchemaReader implements OnReconnect {
   private final ManagedIndexSchemaFactory managedIndexSchemaFactory;
   private SolrZkClient zkClient;
   private String managedSchemaPath;
+  private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on
+  private boolean isRemoved = false;
 
-  public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory) {
+  public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) {
     this.managedIndexSchemaFactory = managedIndexSchemaFactory;
     ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)managedIndexSchemaFactory.getResourceLoader();
     this.zkClient = zkLoader.getZkController().getZkClient();
-    managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
+    this.managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
+    this.uniqueCoreId = solrCore.getName()+":"+solrCore.getStartNanoTime();
+
+    // register a CloseHook for the core this reader is linked to, so that we can de-register the listener
+    solrCore.addCloseHook(new CloseHook() {
+      @Override
+      public void preClose(SolrCore core) {
+        CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
+        if (cc.isZooKeeperAware()) {
+          log.info("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down.");
+          ZkIndexSchemaReader.this.isRemoved = true;
+          cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
+        }
+      }
+
+      @Override
+      public void postClose(SolrCore core) {}
+    });
+
     createSchemaWatcher();
+
     zkLoader.getZkController().addOnReconnectListener(this);
   }
 
@@ -59,6 +83,11 @@ public class ZkIndexSchemaReader implements OnReconnect {
       zkClient.exists(managedSchemaPath, new Watcher() {
         @Override
         public void process(WatchedEvent event) {
+
+          if (ZkIndexSchemaReader.this.isRemoved) {
+            return; // the core for this reader has already been removed, don't process this event
+          }
+
           // session events are not change events, and do not remove the watcher
           if (Event.EventType.None.equals(event.getType())) {
             return;
@@ -135,4 +164,27 @@ public class ZkIndexSchemaReader implements OnReconnect {
       log.error("Failed to update managed-schema watcher after session expiration due to: "+exc, exc);
     }
   }
+
+  public String getUniqueCoreId() {
+    return uniqueCoreId;
+  }
+
+  public String toString() {
+    return "ZkIndexSchemaReader: "+managedSchemaPath+", uniqueCoreId: "+uniqueCoreId;
+  }
+
+  public int hashCode() {
+    return managedSchemaPath.hashCode()+uniqueCoreId.hashCode();
+  }
+
+  // We need the uniqueCoreId which is core name + start time nanos to be the tie breaker
+  // as there can be multiple ZkIndexSchemaReader instances active for the same core after
+  // a reload (one is initializing and the other is being shutdown)
+  public boolean equals(Object other) {
+    if (other == null) return false;
+    if (other == this) return true;
+    if (!(other instanceof ZkIndexSchemaReader)) return false;
+    ZkIndexSchemaReader that = (ZkIndexSchemaReader)other;
+    return this.managedSchemaPath.equals(that.managedSchemaPath) && this.uniqueCoreId.equals(that.uniqueCoreId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
index b6eb5e2..701fc6e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
@@ -16,26 +16,16 @@
  */
 package org.apache.solr.cloud;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.CoreAdminResponse;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,32 +55,13 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
     createCollectionRetry(testCollectionName, 1, 1, 1);
     cloudClient.setDefaultCollection(testCollectionName);
 
-    Replica leader = null;
-    String replicaState = null;
-    int timeoutSecs = 30;
-    long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
-    while (System.nanoTime() < timeout) {
-      Replica tmp = null;
-      try {
-        tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
-      } catch (Exception exc) {}
-      if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) {
-        leader = tmp;
-        replicaState = "active";
-        break;
-      }
-      Thread.sleep(1000);
-    }
-    assertNotNull("Could not find active leader for " + shardId + " of " +
-        testCollectionName + " after "+timeoutSecs+" secs; clusterState: " +
-        printClusterStateInfo(testCollectionName), leader);
+    Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */);
 
     // reload collection and wait to see the core report it has been reloaded
     boolean wasReloaded = reloadCollection(leader, testCollectionName);
     assertTrue("Collection '"+testCollectionName+"' failed to reload within a reasonable amount of time!",
         wasReloaded);
 
-
     // cause session loss
     chaosMonkey.expireSession(getJettyOnPort(getReplicaPort(leader)));
 
@@ -99,8 +70,9 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
     Thread.sleep(15000);
 
     // wait up to 15 seconds to see the replica in the active state
-    timeoutSecs = 15;
-    timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
+    String replicaState = null;
+    int timeoutSecs = 15;
+    long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
     while (System.nanoTime() < timeout) {
       // state of leader should be active after session loss recovery - see SOLR-7338
       cloudClient.getZkStateReader().updateClusterState();
@@ -126,53 +98,4 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
 
     log.info("testReloadedLeaderStateAfterZkSessionLoss succeeded ... shutting down now!");
   }
-
-  protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
-    ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica);
-    String coreName = coreProps.getCoreName();
-    boolean reloadedOk = false;
-    try (HttpSolrClient client = new HttpSolrClient(coreProps.getBaseUrl())) {
-      CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
-      long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
-
-      Thread.sleep(1000);
-
-      // send reload command for the collection
-      log.info("Sending RELOAD command for "+testCollectionName);
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
-      params.set("name", testCollectionName);
-      QueryRequest request = new QueryRequest(params);
-      request.setPath("/admin/collections");
-      client.request(request);
-      Thread.sleep(2000); // reload can take a short while
-
-      // verify reload is done, waiting up to 30 seconds for slow test environments
-      long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
-      while (System.nanoTime() < timeout) {
-        statusResp = CoreAdminRequest.getStatus(coreName, client);
-        long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
-        if (startTimeAfterReload > leaderCoreStartTime) {
-          reloadedOk = true;
-          break;
-        }
-        // else ... still waiting to see the reloaded core report a later start time
-        Thread.sleep(1000);
-      }
-    }
-    return reloadedOk;
-  }
-
-  private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
-      throws SolrServerException, IOException {
-    CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-    if (resp.getResponse().get("failure") != null) {
-      CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
-      req.setCollectionName(testCollectionName);
-      req.process(cloudClient);
-      resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-      if (resp.getResponse().get("failure") != null)
-        fail("Could not create " + testCollectionName);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
index 898f38a..67cf15f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
@@ -291,22 +291,6 @@ public class DistributedVersionInfoTest extends AbstractFullDistribZkTestBase {
     return vers.longValue();
   }
 
-  private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
-      throws SolrServerException, IOException {
-    CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-    if (resp.getResponse().get("failure") != null) {
-      CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
-      req.setCollectionName(testCollectionName);
-      req.process(cloudClient);
-
-      resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-
-      if (resp.getResponse().get("failure") != null) {
-        fail("Could not create " + testCollectionName);
-      }
-    }
-  }
-
   protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
                                               String testCollectionName,
                                               int firstDocId,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 1dd6db4..bdd15fb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -428,22 +428,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     }
   }
 
-  private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
-      throws SolrServerException, IOException {
-    CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-    if (resp.getResponse().get("failure") != null) {
-      CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
-      req.setCollectionName(testCollectionName);
-      req.process(cloudClient);
-      
-      resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-      
-      if (resp.getResponse().get("failure") != null) {
-        fail("Could not create " + testCollectionName);
-      }
-    }
-  }
-
   // test inspired by SOLR-6511
   protected void testLeaderZkSessionLoss() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java b/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java
new file mode 100644
index 0000000..4ce93b3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java
@@ -0,0 +1,156 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.ZkIndexSchemaReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class TestOnReconnectListenerSupport extends AbstractFullDistribZkTestBase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public TestOnReconnectListenerSupport() {
+    super();
+    sliceCount = 2;
+    fixShardCount(3);
+  }
+
+  @BeforeClass
+  public static void initSysProperties() {
+    System.setProperty("managed.schema.mutable", "false");
+    System.setProperty("enable.update.log", "true");
+  }
+
+  @Override
+  protected String getCloudSolrConfig() {
+    return "solrconfig-managed-schema.xml";
+  }
+
+  @Test
+  public void test() throws Exception {
+    waitForThingsToLevelOut(30000);
+
+    String testCollectionName = "c8n_onreconnect_1x1";
+    String shardId = "shard1";
+    createCollectionRetry(testCollectionName, 1, 1, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */);
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+
+    // get the ZkController for the node hosting the leader
+    CoreContainer cores = leaderJetty.getCoreContainer();
+    ZkController zkController = cores.getZkController();
+    assertNotNull("ZkController is null", zkController);
+
+    String leaderCoreName = leader.getStr(CORE_NAME_PROP);
+    String leaderCoreId;
+    try (SolrCore leaderCore = cores.getCore(leaderCoreName)) {
+      assertNotNull("SolrCore for "+leaderCoreName+" not found!", leaderCore);
+      leaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime();
+    }
+
+    // verify the ZkIndexSchemaReader is a registered OnReconnect listener
+    Set<OnReconnect> listeners = zkController.getCurrentOnReconnectListeners();
+    assertNotNull("ZkController returned null OnReconnect listeners", listeners);
+    ZkIndexSchemaReader expectedListener = null;
+    for (OnReconnect listener : listeners) {
+      if (listener instanceof ZkIndexSchemaReader) {
+        ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
+        if (leaderCoreId.equals(reader.getUniqueCoreId())) {
+          expectedListener = reader;
+          break;
+        }
+      }
+    }
+    assertNotNull("ZkIndexSchemaReader for core " + leaderCoreName +
+        " not registered as an OnReconnect listener and should be", expectedListener);
+
+    // reload the collection
+    boolean wasReloaded = reloadCollection(leader, testCollectionName);
+    assertTrue("Collection '" + testCollectionName + "' failed to reload within a reasonable amount of time!",
+        wasReloaded);
+
+    // after reload, the new core should be registered as an OnReconnect listener and the old should not be
+    String reloadedLeaderCoreId;
+    try (SolrCore leaderCore = cores.getCore(leaderCoreName)) {
+      reloadedLeaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime();
+    }
+
+    // they shouldn't be equal after reload
+    assertTrue(!leaderCoreId.equals(reloadedLeaderCoreId));
+
+    listeners = zkController.getCurrentOnReconnectListeners();
+    assertNotNull("ZkController returned null OnReconnect listeners", listeners);
+
+    expectedListener = null; // reset
+    for (OnReconnect listener : listeners) {
+      if (listener instanceof ZkIndexSchemaReader) {
+        ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
+        if (leaderCoreId.equals(reader.getUniqueCoreId())) {
+          fail("Previous core "+leaderCoreId+
+              " should no longer be a registered OnReconnect listener! Current listeners: "+listeners);
+        } else if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) {
+          expectedListener = reader;
+          break;
+        }
+      }
+    }
+
+    assertNotNull("ZkIndexSchemaReader for core "+reloadedLeaderCoreId+
+        " not registered as an OnReconnect listener and should be", expectedListener);
+
+    // try to clean up
+    try {
+      new CollectionAdminRequest.Delete()
+          .setCollectionName(testCollectionName).process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
+
+    listeners = zkController.getCurrentOnReconnectListeners();
+    for (OnReconnect listener : listeners) {
+      if (listener instanceof ZkIndexSchemaReader) {
+        ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
+        if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) {
+          fail("Previous core "+reloadedLeaderCoreId+
+              " should no longer be a registered OnReconnect listener after collection delete!");
+        }
+      }
+    }
+
+    log.info("TestOnReconnectListenerSupport succeeded ... shutting down now!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 4f6b2c6..46aed08 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -16,6 +16,13 @@
  */
 package org.apache.solr.common.cloud;
 
+/**
+ * Implementations are expected to implement a correct hashCode and equals
+ * method needed to uniquely identify the listener as listeners are managed
+ * in a Set. In addition, your listener implementation should call
+ * org.apache.solr.cloud.ZkController#removeOnReconnectListener(OnReconnect)
+ * when it no longer needs to be notified of ZK reconnection events.
+ */
 public interface OnReconnect {
-  public void command();
+  void command();
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14ec48a5/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 043416c..af375b8 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -51,9 +51,11 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.common.SolrDocument;
@@ -1788,6 +1790,43 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     createCollection(collectionInfos, collName, props, client);
   }
 
+  protected void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
+      throws SolrServerException, IOException {
+    CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
+    if (resp.getResponse().get("failure") != null) {
+      CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+
+      resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
+
+      if (resp.getResponse().get("failure") != null) {
+        fail("Could not create " + testCollectionName);
+      }
+    }
+  }
+
+  protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutSecs) throws Exception {
+    Replica leader = null;
+    long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
+    while (System.nanoTime() < timeout) {
+      Replica tmp = null;
+      try {
+        tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
+      } catch (Exception exc) {}
+      if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) {
+        leader = tmp;
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    assertNotNull("Could not find active leader for " + shardId + " of " +
+        testCollectionName + " after "+timeoutSecs+" secs; clusterState: " +
+        printClusterStateInfo(testCollectionName), leader);
+
+    return leader;
+  }
+
   protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
     final RTimer timer = new RTimer();
 
@@ -1875,6 +1914,42 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     return cs;
   }
 
+  protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
+    ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica);
+    String coreName = coreProps.getCoreName();
+    boolean reloadedOk = false;
+    try (HttpSolrClient client = new HttpSolrClient(coreProps.getBaseUrl())) {
+      CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
+      long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
+
+      Thread.sleep(1000);
+
+      // send reload command for the collection
+      log.info("Sending RELOAD command for "+testCollectionName);
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
+      params.set("name", testCollectionName);
+      QueryRequest request = new QueryRequest(params);
+      request.setPath("/admin/collections");
+      client.request(request);
+      Thread.sleep(2000); // reload can take a short while
+
+      // verify reload is done, waiting up to 30 seconds for slow test environments
+      long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+      while (System.nanoTime() < timeout) {
+        statusResp = CoreAdminRequest.getStatus(coreName, client);
+        long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
+        if (startTimeAfterReload > leaderCoreStartTime) {
+          reloadedOk = true;
+          break;
+        }
+        // else ... still waiting to see the reloaded core report a later start time
+        Thread.sleep(1000);
+      }
+    }
+    return reloadedOk;
+  }
+
   static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
       throws IOException, SolrServerException {
     RequestStatusState state = null;