You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2019/03/08 08:34:32 UTC

[lucene-solr] branch branch_6x updated: SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader. (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch branch_6x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_6x by this push:
     new 23dea2c  SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader. (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
23dea2c is described below

commit 23dea2ca7038f435b109aaedac91d8d1335349b8
Author: Christine Poerschke <cp...@apache.org>
AuthorDate: Tue Jun 27 14:04:58 2017 +0100

    SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader.
    (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
---
 solr/CHANGES.txt                                   |   5 +
 .../apache/solr/schema/ZkIndexSchemaReader.java    | 105 ++++++++++++++-------
 .../org/apache/solr/schema/SchemaWatcherTest.java  |  56 +++++++++++
 3 files changed, 130 insertions(+), 36 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f548c3e..3770134 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -32,6 +32,7 @@ Jetty 9.3.14.v20161028
 Detailed Change List
 ----------------------
 
+<<<<<<< HEAD
 Upgrade Notes
 ----------------------
 
@@ -220,6 +221,10 @@ Other Changes
 
  * SOLR-11122: Creating a core should write a core.properties file first and clean up on failure
    (Erick Erickson)
+=======
+* SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader.
+  (Torsten Bøgh Köster, Christine Poerschke, Jörg Rathlev, Mike Drob)
+>>>>>>> 386b22203e... SOLR-10506: Fix memory leak (upon collection reload or ZooKeeper session expiry) in ZkIndexSchemaReader.
 
 ==================  6.6.5 ==================
 
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index e719404..d83762a 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -42,7 +42,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
   private SolrZkClient zkClient;
   private String managedSchemaPath;
   private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on
-  private boolean isRemoved = false;
+  private SchemaWatcher schemaWatcher;
 
   public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) {
     this.managedIndexSchemaFactory = managedIndexSchemaFactory;
@@ -58,16 +58,20 @@ public class ZkIndexSchemaReader implements OnReconnect {
         CoreContainer cc = core.getCoreContainer();
         if (cc.isZooKeeperAware()) {
           log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down.");
-          ZkIndexSchemaReader.this.isRemoved = true;
           cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
         }
       }
 
       @Override
-      public void postClose(SolrCore core) {}
+      public void postClose(SolrCore core) {
+        // The watcher is still registered with Zookeeper, and holds a
+        // reference to the schema reader, which indirectly references the
+        // SolrCore and would prevent it from being garbage collected.
+        schemaWatcher.discardReaderReference();
+      }
     });
 
-    createSchemaWatcher();
+    this.schemaWatcher = createSchemaWatcher();
 
     zkLoader.getZkController().addOnReconnectListener(this);
   }
@@ -76,39 +80,17 @@ public class ZkIndexSchemaReader implements OnReconnect {
     return managedIndexSchemaFactory.getSchemaUpdateLock(); 
   }
 
-  public void createSchemaWatcher() {
+  /**
+   * Creates a schema watcher and returns it for controlling purposes.
+   * 
+   * @return the registered {@linkplain SchemaWatcher}.
+   */
+  public SchemaWatcher createSchemaWatcher() {
     log.info("Creating ZooKeeper watch for the managed schema at " + managedSchemaPath);
 
+    SchemaWatcher watcher = new SchemaWatcher(this);
     try {
-      zkClient.exists(managedSchemaPath, new Watcher() {
-        @Override
-        public void process(WatchedEvent event) {
-
-          if (ZkIndexSchemaReader.this.isRemoved) {
-            return; // the core for this reader has already been removed, don't process this event
-          }
-
-          // session events are not change events, and do not remove the watcher
-          if (Event.EventType.None.equals(event.getType())) {
-            return;
-          }
-          log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
-          try {
-            updateSchema(this, -1);
-          } catch (KeeperException e) {
-            if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            log.error("", e);
-            throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
-          } catch (InterruptedException e) {
-            // Restore the interrupted status
-            Thread.currentThread().interrupt();
-            log.warn("", e);
-          }
-        }
-      }, true);
+      zkClient.exists(managedSchemaPath, watcher, true);
     } catch (KeeperException e) {
       final String msg = "Error creating ZooKeeper watch for the managed schema";
       log.error(msg, e);
@@ -118,6 +100,56 @@ public class ZkIndexSchemaReader implements OnReconnect {
       Thread.currentThread().interrupt();
       log.warn("", e);
     }
+    
+    return watcher;
+  }
+  
+  /**
+   * Watches for schema changes and triggers updates in the {@linkplain ZkIndexSchemaReader}.
+   */
+  public static class SchemaWatcher implements Watcher {
+
+    private ZkIndexSchemaReader schemaReader;
+
+    public SchemaWatcher(ZkIndexSchemaReader reader) {
+      this.schemaReader = reader;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      ZkIndexSchemaReader indexSchemaReader = schemaReader;
+
+      if (indexSchemaReader == null) {
+        return; // the core for this reader has already been removed, don't process this event
+      }
+
+      // session events are not change events, and do not remove the watcher
+      if (Event.EventType.None.equals(event.getType())) {
+        return;
+      }
+      log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
+      try {
+        indexSchemaReader.updateSchema(this, -1);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+          return;
+        }
+        log.error("", e);
+        throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("", e);
+      }
+    }
+
+    /**
+     * Discard the reference to the {@code ZkIndexSchemaReader}.
+     */
+    public void discardReaderReference() {
+      schemaReader = null;
+    }
   }
 
   public ManagedIndexSchema refreshSchemaFromZk(int expectedZkVersion) throws KeeperException, InterruptedException {
@@ -125,7 +157,8 @@ public class ZkIndexSchemaReader implements OnReconnect {
     return managedIndexSchemaFactory.getSchema();
   }
 
-  private void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperException, InterruptedException {
+  // package visibility for test purposes
+  void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperException, InterruptedException {
     Stat stat = new Stat();
     synchronized (getSchemaUpdateLock()) {
       final ManagedIndexSchema oldSchema = managedIndexSchemaFactory.getSchema();
@@ -157,7 +190,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
   public void command() {
     try {
       // setup a new watcher to get notified when the managed schema changes
-      createSchemaWatcher();
+      schemaWatcher = createSchemaWatcher();
       // force update now as the schema may have changed while our zk session was expired
       updateSchema(null, -1);
     } catch (Exception exc) {
diff --git a/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
new file mode 100644
index 0000000..4d46aad
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.schema;
+
+import org.apache.solr.schema.ZkIndexSchemaReader.SchemaWatcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class SchemaWatcherTest {
+
+  private ZkIndexSchemaReader mockSchemaReader;
+  private SchemaWatcher schemaWatcher;
+
+  @Before
+  public void setUp() throws Exception {
+    mockSchemaReader = mock(ZkIndexSchemaReader.class);
+    schemaWatcher = new SchemaWatcher(mockSchemaReader);
+  }
+
+  @Test
+  public void testProcess() throws Exception {
+    schemaWatcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, "/test"));
+    verify(mockSchemaReader).updateSchema(schemaWatcher, -1);
+  }
+
+  @Test
+  public void testDiscardReaderReference() throws Exception {
+    schemaWatcher.discardReaderReference();
+
+    schemaWatcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, "/test"));
+    // after discardReaderReference, SchemaWatcher should no longer hold a ref to the reader
+    verifyZeroInteractions(mockSchemaReader);
+  }
+}