You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2019/03/08 08:34:32 UTC
[lucene-solr] branch branch_6x updated: SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader. (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch branch_6x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_6x by this push:
new 23dea2c SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader. (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
23dea2c is described below
commit 23dea2ca7038f435b109aaedac91d8d1335349b8
Author: Christine Poerschke <cp...@apache.org>
AuthorDate: Tue Jun 27 14:04:58 2017 +0100
SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader.
(Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
---
solr/CHANGES.txt | 5 +
.../apache/solr/schema/ZkIndexSchemaReader.java | 105 ++++++++++++++-------
.../org/apache/solr/schema/SchemaWatcherTest.java | 56 +++++++++++
3 files changed, 130 insertions(+), 36 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f548c3e..3770134 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -32,6 +32,7 @@ Jetty 9.3.14.v20161028
Detailed Change List
----------------------
+<<<<<<< HEAD
Upgrade Notes
----------------------
@@ -220,6 +221,10 @@ Other Changes
* SOLR-11122: Creating a core should write a core.properties file first and clean up on failure
(Erick Erickson)
+=======
+* SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader.
+ (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
+>>>>>>> 386b22203e... SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader.
================== 6.6.5 ==================
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index e719404..d83762a 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -42,7 +42,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
private SolrZkClient zkClient;
private String managedSchemaPath;
private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on
- private boolean isRemoved = false;
+ private SchemaWatcher schemaWatcher;
public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) {
this.managedIndexSchemaFactory = managedIndexSchemaFactory;
@@ -58,16 +58,20 @@ public class ZkIndexSchemaReader implements OnReconnect {
CoreContainer cc = core.getCoreContainer();
if (cc.isZooKeeperAware()) {
log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down.");
- ZkIndexSchemaReader.this.isRemoved = true;
cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
}
}
@Override
- public void postClose(SolrCore core) {}
+ public void postClose(SolrCore core) {
+ // The watcher is still registered with Zookeeper, and holds a
+ // reference to the schema reader, which indirectly references the
+ // SolrCore and would prevent it from being garbage collected.
+ schemaWatcher.discardReaderReference();
+ }
});
- createSchemaWatcher();
+ this.schemaWatcher = createSchemaWatcher();
zkLoader.getZkController().addOnReconnectListener(this);
}
@@ -76,39 +80,17 @@ public class ZkIndexSchemaReader implements OnReconnect {
return managedIndexSchemaFactory.getSchemaUpdateLock();
}
- public void createSchemaWatcher() {
+ /**
+ * Creates a schema watcher and returns it for controlling purposes.
+ *
+ * @return the registered {@linkplain SchemaWatcher}.
+ */
+ public SchemaWatcher createSchemaWatcher() {
log.info("Creating ZooKeeper watch for the managed schema at " + managedSchemaPath);
+ SchemaWatcher watcher = new SchemaWatcher(this);
try {
- zkClient.exists(managedSchemaPath, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
-
- if (ZkIndexSchemaReader.this.isRemoved) {
- return; // the core for this reader has already been removed, don't process this event
- }
-
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(event.getType())) {
- return;
- }
- log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
- try {
- updateSchema(this, -1);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
- }
- }
- }, true);
+ zkClient.exists(managedSchemaPath, watcher, true);
} catch (KeeperException e) {
final String msg = "Error creating ZooKeeper watch for the managed schema";
log.error(msg, e);
@@ -118,6 +100,56 @@ public class ZkIndexSchemaReader implements OnReconnect {
Thread.currentThread().interrupt();
log.warn("", e);
}
+
+ return watcher;
+ }
+
+ /**
+ * Watches for schema changes and triggers updates in the {@linkplain ZkIndexSchemaReader}.
+ */
+ public static class SchemaWatcher implements Watcher {
+
+ private ZkIndexSchemaReader schemaReader;
+
+ public SchemaWatcher(ZkIndexSchemaReader reader) {
+ this.schemaReader = reader;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ ZkIndexSchemaReader indexSchemaReader = schemaReader;
+
+ if (indexSchemaReader == null) {
+ return; // the core for this reader has already been removed, don't process this event
+ }
+
+ // session events are not change events, and do not remove the watcher
+ if (Event.EventType.None.equals(event.getType())) {
+ return;
+ }
+ log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
+ try {
+ indexSchemaReader.updateSchema(this, -1);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ }
+ }
+
+ /**
+ * Discard the reference to the {@code ZkIndexSchemaReader}.
+ */
+ public void discardReaderReference() {
+ schemaReader = null;
+ }
}
public ManagedIndexSchema refreshSchemaFromZk(int expectedZkVersion) throws KeeperException, InterruptedException {
@@ -125,7 +157,8 @@ public class ZkIndexSchemaReader implements OnReconnect {
return managedIndexSchemaFactory.getSchema();
}
- private void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperException, InterruptedException {
+ // package visibility for test purposes
+ void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperException, InterruptedException {
Stat stat = new Stat();
synchronized (getSchemaUpdateLock()) {
final ManagedIndexSchema oldSchema = managedIndexSchemaFactory.getSchema();
@@ -157,7 +190,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
public void command() {
try {
// setup a new watcher to get notified when the managed schema changes
- createSchemaWatcher();
+ schemaWatcher = createSchemaWatcher();
// force update now as the schema may have changed while our zk session was expired
updateSchema(null, -1);
} catch (Exception exc) {
diff --git a/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
new file mode 100644
index 0000000..4d46aad
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.schema;
+
+import org.apache.solr.schema.ZkIndexSchemaReader.SchemaWatcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class SchemaWatcherTest {
+
+ private ZkIndexSchemaReader mockSchemaReader;
+ private SchemaWatcher schemaWatcher;
+
+ @Before
+ public void setUp() throws Exception {
+ mockSchemaReader = mock(ZkIndexSchemaReader.class);
+ schemaWatcher = new SchemaWatcher(mockSchemaReader);
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ schemaWatcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, "/test"));
+ verify(mockSchemaReader).updateSchema(schemaWatcher, -1);
+ }
+
+ @Test
+ public void testDiscardReaderReference() throws Exception {
+ schemaWatcher.discardReaderReference();
+
+ schemaWatcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, "/test"));
+ // after discardReaderReference, SchemaWatcher should no longer hold a ref to the reader
+ verifyZeroInteractions(mockSchemaReader);
+ }
+}