You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2016/04/14 20:09:43 UTC
lucene-solr:branch_5x: SOLR-8908: Fixed OnReconnect listener
management in ZkController to allow for de-registering listeners.
Repository: lucene-solr
Updated Branches:
refs/heads/branch_5x c17ff70ba -> 34b99fdb1
SOLR-8908: Fixed OnReconnect listener management in ZkController to allow for de-registering listeners.
Here's what this commit includes:
* Added the removeOnReconnectListener method to ZkController to allow OnReconnect listener implementations to de-register; avoids a memory leak
* Updated ZkIndexSchemaReader to add a CloseHook to the SolrCore it supports to de-register as an OnReconnect listener
* Added unit test to verify that after reloading and deleting a SolrCore in managed schema mode, the associated ZkIndexSchemaReader gets de-registered correctly
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/34b99fdb
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/34b99fdb
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/34b99fdb
Branch: refs/heads/branch_5x
Commit: 34b99fdb1301b48a5e07f0678924fe5bf9033c12
Parents: c17ff70
Author: Timothy Potter <th...@gmail.com>
Authored: Thu Apr 14 10:17:42 2016 -0700
Committer: Timothy Potter <th...@gmail.com>
Committed: Thu Apr 14 11:09:20 2016 -0700
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/ZkController.java | 66 +++++++-
.../solr/schema/ManagedIndexSchemaFactory.java | 4 +-
.../apache/solr/schema/ZkIndexSchemaReader.java | 56 ++++++-
.../apache/solr/cloud/CollectionReloadTest.java | 85 +---------
.../solr/cloud/DistributedVersionInfoTest.java | 16 --
.../apache/solr/cloud/HttpPartitionTest.java | 16 --
.../cloud/TestOnReconnectListenerSupport.java | 156 +++++++++++++++++++
.../apache/solr/common/cloud/OnReconnect.java | 9 +-
.../cloud/AbstractFullDistribZkTestBase.java | 75 +++++++++
10 files changed, 360 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e160b9b..b2bbc15 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -53,6 +53,8 @@ Bug Fixes
* SOLR-8835: JSON Facet API: fix faceting exception on multi-valued numeric fields that
have docValues. (yonik)
+* SOLR-8908: Fix to OnReconnect listener registration to allow listeners to deregister, such
+ as when a core is reloaded or deleted to avoid a memory leak. (Timothy Potter)
======================= 5.5.0 =======================
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e1d4e83..a5da683 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -184,7 +184,8 @@ public final class ZkController {
private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert
// keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
- private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
+ // ref is held as a HashSet since we clone the set before notifying to avoid synchronizing too long
+ private HashSet<OnReconnect> reconnectListeners = new HashSet<OnReconnect>();
private class RegisterCoreAsync implements Callable {
@@ -205,6 +206,22 @@ public final class ZkController {
}
}
+ // notifies registered listeners after the ZK reconnect in the background
+ private class OnReconnectNotifyAsync implements Callable {
+
+ private final OnReconnect listener;
+
+ OnReconnectNotifyAsync(OnReconnect listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ listener.command();
+ return null;
+ }
+ }
+
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
throws InterruptedException, TimeoutException, IOException {
@@ -291,8 +308,8 @@ public final class ZkController {
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
// re register all descriptors
+ ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
if (descriptors != null) {
- ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it
// was
@@ -315,17 +332,23 @@ public final class ZkController {
}
// notify any other objects that need to know when the session was re-connected
+ HashSet<OnReconnect> clonedListeners;
synchronized (reconnectListeners) {
- for (OnReconnect listener : reconnectListeners) {
- try {
+ clonedListeners = (HashSet<OnReconnect>)reconnectListeners.clone();
+ }
+ // the OnReconnect operation can be expensive per listener, so do that async in the background
+ for (OnReconnect listener : clonedListeners) {
+ try {
+ if (executorService != null) {
+ executorService.submit(new OnReconnectNotifyAsync(listener));
+ } else {
listener.command();
- } catch (Exception exc) {
- // not much we can do here other than warn in the log
- log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc);
}
+ } catch (Exception exc) {
+ // not much we can do here other than warn in the log
+ log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc);
}
}
-
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -2224,11 +2247,38 @@ public final class ZkController {
if (listener != null) {
synchronized (reconnectListeners) {
reconnectListeners.add(listener);
+ log.info("Added new OnReconnect listener "+listener);
}
}
}
/**
+ * Removed a previously registered OnReconnect listener, such as when a core is removed or reloaded.
+ */
+ public void removeOnReconnectListener(OnReconnect listener) {
+ if (listener != null) {
+ boolean wasRemoved;
+ synchronized (reconnectListeners) {
+ wasRemoved = reconnectListeners.remove(listener);
+ }
+ if (wasRemoved) {
+ log.info("Removed OnReconnect listener "+listener);
+ } else {
+ log.warn("Was asked to remove OnReconnect listener "+listener+
+ ", but remove operation did not find it in the list of registered listeners.");
+ }
+ }
+ }
+
+ Set<OnReconnect> getCurrentOnReconnectListeners() {
+ HashSet<OnReconnect> clonedListeners;
+ synchronized (reconnectListeners) {
+ clonedListeners = (HashSet<OnReconnect>)reconnectListeners.clone();
+ }
+ return clonedListeners;
+ }
+
+ /**
* Persists a config file to ZooKeeper using optimistic concurrency.
*
* @return true on success
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
index 8ea007f..60a7750 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
@@ -30,6 +30,8 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
@@ -373,7 +375,7 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
public void inform(SolrCore core) {
this.core = core;
if (loader instanceof ZkSolrResourceLoader) {
- this.zkIndexSchemaReader = new ZkIndexSchemaReader(this);
+ this.zkIndexSchemaReader = new ZkIndexSchemaReader(this, core);
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
zkLoader.setZkIndexSchemaReader(this.zkIndexSchemaReader);
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index db1bad8..25cf158 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -20,6 +20,9 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -38,13 +41,34 @@ public class ZkIndexSchemaReader implements OnReconnect {
private final ManagedIndexSchemaFactory managedIndexSchemaFactory;
private SolrZkClient zkClient;
private String managedSchemaPath;
+ private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on
+ private boolean isRemoved = false;
- public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory) {
+ public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) {
this.managedIndexSchemaFactory = managedIndexSchemaFactory;
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)managedIndexSchemaFactory.getResourceLoader();
this.zkClient = zkLoader.getZkController().getZkClient();
- managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
+ this.managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
+ this.uniqueCoreId = solrCore.getName()+":"+solrCore.getStartNanoTime();
+
+ // register a CloseHook for the core this reader is linked to, so that we can de-register the listener
+ solrCore.addCloseHook(new CloseHook() {
+ @Override
+ public void preClose(SolrCore core) {
+ CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
+ if (cc.isZooKeeperAware()) {
+ log.info("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down.");
+ ZkIndexSchemaReader.this.isRemoved = true;
+ cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
+ }
+ }
+
+ @Override
+ public void postClose(SolrCore core) {}
+ });
+
createSchemaWatcher();
+
zkLoader.getZkController().addOnReconnectListener(this);
}
@@ -59,6 +83,11 @@ public class ZkIndexSchemaReader implements OnReconnect {
zkClient.exists(managedSchemaPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
+
+ if (ZkIndexSchemaReader.this.isRemoved) {
+ return; // the core for this reader has already been removed, don't process this event
+ }
+
// session events are not change events, and do not remove the watcher
if (Event.EventType.None.equals(event.getType())) {
return;
@@ -135,4 +164,27 @@ public class ZkIndexSchemaReader implements OnReconnect {
log.error("Failed to update managed-schema watcher after session expiration due to: "+exc, exc);
}
}
+
+ public String getUniqueCoreId() {
+ return uniqueCoreId;
+ }
+
+ public String toString() {
+ return "ZkIndexSchemaReader: "+managedSchemaPath+", uniqueCoreId: "+uniqueCoreId;
+ }
+
+ public int hashCode() {
+ return managedSchemaPath.hashCode()+uniqueCoreId.hashCode();
+ }
+
+ // We need the uniqueCoreId which is core name + start time nanos to be the tie breaker
+ // as there can be multiple ZkIndexSchemaReader instances active for the same core after
+ // a reload (one is initializing and the other is being shutdown)
+ public boolean equals(Object other) {
+ if (other == null) return false;
+ if (other == this) return true;
+ if (!(other instanceof ZkIndexSchemaReader)) return false;
+ ZkIndexSchemaReader that = (ZkIndexSchemaReader)other;
+ return this.managedSchemaPath.equals(that.managedSchemaPath) && this.uniqueCoreId.equals(that.uniqueCoreId);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
index b6eb5e2..701fc6e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionReloadTest.java
@@ -16,26 +16,16 @@
*/
package org.apache.solr.cloud;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,32 +55,13 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
createCollectionRetry(testCollectionName, 1, 1, 1);
cloudClient.setDefaultCollection(testCollectionName);
- Replica leader = null;
- String replicaState = null;
- int timeoutSecs = 30;
- long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
- while (System.nanoTime() < timeout) {
- Replica tmp = null;
- try {
- tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
- } catch (Exception exc) {}
- if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) {
- leader = tmp;
- replicaState = "active";
- break;
- }
- Thread.sleep(1000);
- }
- assertNotNull("Could not find active leader for " + shardId + " of " +
- testCollectionName + " after "+timeoutSecs+" secs; clusterState: " +
- printClusterStateInfo(testCollectionName), leader);
+ Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */);
// reload collection and wait to see the core report it has been reloaded
boolean wasReloaded = reloadCollection(leader, testCollectionName);
assertTrue("Collection '"+testCollectionName+"' failed to reload within a reasonable amount of time!",
wasReloaded);
-
// cause session loss
chaosMonkey.expireSession(getJettyOnPort(getReplicaPort(leader)));
@@ -99,8 +70,9 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
Thread.sleep(15000);
// wait up to 15 seconds to see the replica in the active state
- timeoutSecs = 15;
- timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
+ String replicaState = null;
+ int timeoutSecs = 15;
+ long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
while (System.nanoTime() < timeout) {
// state of leader should be active after session loss recovery - see SOLR-7338
cloudClient.getZkStateReader().updateClusterState();
@@ -126,53 +98,4 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
log.info("testReloadedLeaderStateAfterZkSessionLoss succeeded ... shutting down now!");
}
-
- protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
- ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica);
- String coreName = coreProps.getCoreName();
- boolean reloadedOk = false;
- try (HttpSolrClient client = new HttpSolrClient(coreProps.getBaseUrl())) {
- CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
- long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
-
- Thread.sleep(1000);
-
- // send reload command for the collection
- log.info("Sending RELOAD command for "+testCollectionName);
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
- params.set("name", testCollectionName);
- QueryRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
- client.request(request);
- Thread.sleep(2000); // reload can take a short while
-
- // verify reload is done, waiting up to 30 seconds for slow test environments
- long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
- while (System.nanoTime() < timeout) {
- statusResp = CoreAdminRequest.getStatus(coreName, client);
- long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
- if (startTimeAfterReload > leaderCoreStartTime) {
- reloadedOk = true;
- break;
- }
- // else ... still waiting to see the reloaded core report a later start time
- Thread.sleep(1000);
- }
- }
- return reloadedOk;
- }
-
- private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
- throws SolrServerException, IOException {
- CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
- if (resp.getResponse().get("failure") != null) {
- CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
- req.setCollectionName(testCollectionName);
- req.process(cloudClient);
- resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
- if (resp.getResponse().get("failure") != null)
- fail("Could not create " + testCollectionName);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
index 898f38a..67cf15f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedVersionInfoTest.java
@@ -291,22 +291,6 @@ public class DistributedVersionInfoTest extends AbstractFullDistribZkTestBase {
return vers.longValue();
}
- private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
- throws SolrServerException, IOException {
- CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
- if (resp.getResponse().get("failure") != null) {
- CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
- req.setCollectionName(testCollectionName);
- req.process(cloudClient);
-
- resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-
- if (resp.getResponse().get("failure") != null) {
- fail("Could not create " + testCollectionName);
- }
- }
- }
-
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName,
int firstDocId,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 1dd6db4..bdd15fb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -428,22 +428,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
}
}
- private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
- throws SolrServerException, IOException {
- CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
- if (resp.getResponse().get("failure") != null) {
- CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
- req.setCollectionName(testCollectionName);
- req.process(cloudClient);
-
- resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
-
- if (resp.getResponse().get("failure") != null) {
- fail("Could not create " + testCollectionName);
- }
- }
- }
-
// test inspired by SOLR-6511
protected void testLeaderZkSessionLoss() throws Exception {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java b/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java
new file mode 100644
index 0000000..4ce93b3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestOnReconnectListenerSupport.java
@@ -0,0 +1,156 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.ZkIndexSchemaReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class TestOnReconnectListenerSupport extends AbstractFullDistribZkTestBase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public TestOnReconnectListenerSupport() {
+ super();
+ sliceCount = 2;
+ fixShardCount(3);
+ }
+
+ @BeforeClass
+ public static void initSysProperties() {
+ System.setProperty("managed.schema.mutable", "false");
+ System.setProperty("enable.update.log", "true");
+ }
+
+ @Override
+ protected String getCloudSolrConfig() {
+ return "solrconfig-managed-schema.xml";
+ }
+
+ @Test
+ public void test() throws Exception {
+ waitForThingsToLevelOut(30000);
+
+ String testCollectionName = "c8n_onreconnect_1x1";
+ String shardId = "shard1";
+ createCollectionRetry(testCollectionName, 1, 1, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+
+ // get the ZkController for the node hosting the leader
+ CoreContainer cores = leaderJetty.getCoreContainer();
+ ZkController zkController = cores.getZkController();
+ assertNotNull("ZkController is null", zkController);
+
+ String leaderCoreName = leader.getStr(CORE_NAME_PROP);
+ String leaderCoreId;
+ try (SolrCore leaderCore = cores.getCore(leaderCoreName)) {
+ assertNotNull("SolrCore for "+leaderCoreName+" not found!", leaderCore);
+ leaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime();
+ }
+
+ // verify the ZkIndexSchemaReader is a registered OnReconnect listener
+ Set<OnReconnect> listeners = zkController.getCurrentOnReconnectListeners();
+ assertNotNull("ZkController returned null OnReconnect listeners", listeners);
+ ZkIndexSchemaReader expectedListener = null;
+ for (OnReconnect listener : listeners) {
+ if (listener instanceof ZkIndexSchemaReader) {
+ ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
+ if (leaderCoreId.equals(reader.getUniqueCoreId())) {
+ expectedListener = reader;
+ break;
+ }
+ }
+ }
+ assertNotNull("ZkIndexSchemaReader for core " + leaderCoreName +
+ " not registered as an OnReconnect listener and should be", expectedListener);
+
+ // reload the collection
+ boolean wasReloaded = reloadCollection(leader, testCollectionName);
+ assertTrue("Collection '" + testCollectionName + "' failed to reload within a reasonable amount of time!",
+ wasReloaded);
+
+ // after reload, the new core should be registered as an OnReconnect listener and the old should not be
+ String reloadedLeaderCoreId;
+ try (SolrCore leaderCore = cores.getCore(leaderCoreName)) {
+ reloadedLeaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime();
+ }
+
+ // they shouldn't be equal after reload
+ assertTrue(!leaderCoreId.equals(reloadedLeaderCoreId));
+
+ listeners = zkController.getCurrentOnReconnectListeners();
+ assertNotNull("ZkController returned null OnReconnect listeners", listeners);
+
+ expectedListener = null; // reset
+ for (OnReconnect listener : listeners) {
+ if (listener instanceof ZkIndexSchemaReader) {
+ ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
+ if (leaderCoreId.equals(reader.getUniqueCoreId())) {
+ fail("Previous core "+leaderCoreId+
+ " should no longer be a registered OnReconnect listener! Current listeners: "+listeners);
+ } else if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) {
+ expectedListener = reader;
+ break;
+ }
+ }
+ }
+
+ assertNotNull("ZkIndexSchemaReader for core "+reloadedLeaderCoreId+
+ " not registered as an OnReconnect listener and should be", expectedListener);
+
+ // try to clean up
+ try {
+ new CollectionAdminRequest.Delete()
+ .setCollectionName(testCollectionName).process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
+
+ listeners = zkController.getCurrentOnReconnectListeners();
+ for (OnReconnect listener : listeners) {
+ if (listener instanceof ZkIndexSchemaReader) {
+ ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
+ if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) {
+ fail("Previous core "+reloadedLeaderCoreId+
+ " should no longer be a registered OnReconnect listener after collection delete!");
+ }
+ }
+ }
+
+ log.info("TestOnReconnectListenerSupport succeeded ... shutting down now!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 4f6b2c6..46aed08 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -16,6 +16,13 @@
*/
package org.apache.solr.common.cloud;
+/**
+ * Implementations are expected to implement a correct hashCode and equals
+ * method needed to uniquely identify the listener as listeners are managed
+ * in a Set. In addition, your listener implementation should call
+ * org.apache.solr.cloud.ZkController#removeOnReconnectListener(OnReconnect)
+ * when it no longer needs to be notified of ZK reconnection events.
+ */
public interface OnReconnect {
- public void command();
+ void command();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34b99fdb/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 043416c..af375b8 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -51,9 +51,11 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.SolrDocument;
@@ -1788,6 +1790,43 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
createCollection(collectionInfos, collName, props, client);
}
+ protected void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
+ throws SolrServerException, IOException {
+ CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
+ if (resp.getResponse().get("failure") != null) {
+ CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+
+ resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
+
+ if (resp.getResponse().get("failure") != null) {
+ fail("Could not create " + testCollectionName);
+ }
+ }
+ }
+
+ protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutSecs) throws Exception {
+ Replica leader = null;
+ long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
+ while (System.nanoTime() < timeout) {
+ Replica tmp = null;
+ try {
+ tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
+ } catch (Exception exc) {}
+ if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) {
+ leader = tmp;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertNotNull("Could not find active leader for " + shardId + " of " +
+ testCollectionName + " after "+timeoutSecs+" secs; clusterState: " +
+ printClusterStateInfo(testCollectionName), leader);
+
+ return leader;
+ }
+
protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
final RTimer timer = new RTimer();
@@ -1875,6 +1914,42 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return cs;
}
+ protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
+ ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica);
+ String coreName = coreProps.getCoreName();
+ boolean reloadedOk = false;
+ try (HttpSolrClient client = new HttpSolrClient(coreProps.getBaseUrl())) {
+ CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
+ long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
+
+ Thread.sleep(1000);
+
+ // send reload command for the collection
+ log.info("Sending RELOAD command for "+testCollectionName);
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
+ params.set("name", testCollectionName);
+ QueryRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ client.request(request);
+ Thread.sleep(2000); // reload can take a short while
+
+ // verify reload is done, waiting up to 30 seconds for slow test environments
+ long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ while (System.nanoTime() < timeout) {
+ statusResp = CoreAdminRequest.getStatus(coreName, client);
+ long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
+ if (startTimeAfterReload > leaderCoreStartTime) {
+ reloadedOk = true;
+ break;
+ }
+ // else ... still waiting to see the reloaded core report a later start time
+ Thread.sleep(1000);
+ }
+ }
+ return reloadedOk;
+ }
+
static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
throws IOException, SolrServerException {
RequestStatusState state = null;