You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/02/12 18:32:24 UTC
svn commit: r909510 - in
/lucene/solr/branches/cloud/src/java/org/apache/solr:
cloud/ZkController.java cloud/ZkStateReader.java core/CoreContainer.java
Author: markrmiller
Date: Fri Feb 12 17:32:23 2010
New Revision: 909510
URL: http://svn.apache.org/viewvc?rev=909510&view=rev
Log:
factor out ZkStateReader
Added:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=909510&r1=909509&r2=909510&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Fri Feb 12 17:32:23 2010
@@ -20,10 +20,12 @@
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -55,7 +57,6 @@
static final String NEWL = System.getProperty("line.separator");
- private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
@@ -73,8 +74,8 @@
public final static String CONFIGNAME_PROP="configName";
private SolrZkClient zkClient;
-
- private volatile CloudState cloudState;
+
+ private ZkStateReader zkStateReader;
private String zkServerAddress;
@@ -85,9 +86,7 @@
private String hostName;
- private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1);
- private boolean cloudStateUpdateScheduled;
private boolean readonly; // temporary hack to enable reuse in SolrJ client
@@ -110,7 +109,7 @@
this.localHostContext = localHostContext;
this.localHost = localHost;
this.readonly = localHostPort==null;
- cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
+
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
// on reconnect, reload cloud info
new OnReconnect() {
@@ -119,7 +118,7 @@
try {
// nocommit: recreate watches ????
createEphemeralLiveNode();
- updateCloudState(false);
+ zkStateReader.updateCloudState(false);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -138,7 +137,7 @@
}
});
-
+ zkStateReader = new ZkStateReader(zkClient);
init();
}
@@ -209,7 +208,7 @@
* @return information about the cluster from ZooKeeper
*/
public CloudState getCloudState() {
- return cloudState;
+ return zkStateReader.getCloudState();
}
/**
@@ -323,7 +322,7 @@
try {
log.info("Updating live nodes:" + zkClient);
try {
- updateLiveNodes();
+ zkStateReader.updateLiveNodes();
} finally {
// remake watch
zkClient.getChildren(event.getPath(), this);
@@ -382,76 +381,7 @@
return hostName + ":" + localHostPort + "_"+ localHostContext;
}
- // load and publish a new CollectionInfo
- public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
- IOException {
- updateCloudState(immediate, false);
- }
-
- // load and publish a new CollectionInfo
- private void updateLiveNodes() throws KeeperException, InterruptedException,
- IOException {
- updateCloudState(true, true);
- }
-
- // load and publish a new CollectionInfo
- private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
- IOException {
- // TODO: - incremental update rather than reread everything
-
- // build immutable CloudInfo
-
- if(immediate) {
- if(!onlyLiveNodes) {
- log.info("Updating cloud state from ZooKeeper... ");
- } else {
- log.info("Updating live nodes from ZooKeeper... ");
- }
- CloudState cloudState;
- cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
- // update volatile
- this.cloudState = cloudState;
- } else {
- if(cloudStateUpdateScheduled) {
- log.info("Cloud state update for ZooKeeper already scheduled");
- return;
- }
- log.info("Scheduling cloud state update from ZooKeeper...");
- cloudStateUpdateScheduled = true;
- updateCloudExecutor.schedule(new Runnable() {
-
- public void run() {
- log.info("Updating cloud state from ZooKeeper...");
- synchronized (ZkController.this) {
- cloudStateUpdateScheduled = false;
- CloudState cloudState;
- try {
- cloudState = CloudState.buildCloudState(zkClient,
- ZkController.this.cloudState, onlyLiveNodes);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- // update volatile
- ZkController.this.cloudState = cloudState;
- }
- }
- }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
- }
-
- }
/**
* @param path
@@ -634,9 +564,9 @@
public void process(WatchedEvent event) {
try {
log.info("Detected a new or removed collection");
- synchronized (ZkController.this) {
+ synchronized (zkStateReader.getUpdateLock()) {
addShardZkNodeWatches();
- updateCloudState(false);
+ zkStateReader.updateCloudState(false);
}
// re-watch
zkClient.getChildren(event.getPath(), this);
@@ -666,9 +596,9 @@
}
log.info("Notified of CloudState change");
try {
- synchronized (ZkController.this) {
+ synchronized (zkStateReader.getUpdateLock()) {
addShardZkNodeWatches();
- updateCloudState(false);
+ zkStateReader.updateCloudState(false);
}
zkClient.exists(COLLECTIONS_ZKNODE, this);
} catch (KeeperException e) {
@@ -704,7 +634,7 @@
log.info("Detected changed ShardId in collection:" + collection);
try {
addShardsWatches(collection);
- updateCloudState(false);
+ zkStateReader.updateCloudState(false);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(
@@ -768,7 +698,7 @@
public void process(WatchedEvent event) {
log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
try {
- updateCloudState(false);
+ zkStateReader.updateCloudState(false);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -889,5 +819,9 @@
}
}
+
+ public ZkStateReader getZkStateReader() {
+ return zkStateReader;
+ }
}
Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java?rev=909510&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java Fri Feb 12 17:32:23 2010
@@ -0,0 +1,162 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.cloud.SolrZkClient.OnReconnect;
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkStateReader {
+ private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
+
+ private volatile CloudState cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
+
+ private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
+
+ private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1);
+
+ private boolean cloudStateUpdateScheduled;
+
+ private SolrZkClient zkClient;
+
+ public ZkStateReader(SolrZkClient zkClient) {
+ this.zkClient = zkClient;
+ }
+
+ public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
+ zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
+ // on reconnect, reload cloud info
+ new OnReconnect() {
+
+ public void command() {
+ try {
+ // nocommit: recreate watches ????
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+ });
+ }
+
+ // load and publish a new CollectionInfo
+ public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+ IOException {
+ updateCloudState(immediate, false);
+ }
+
+ // load and publish a new CollectionInfo
+ public void updateLiveNodes() throws KeeperException, InterruptedException,
+ IOException {
+ updateCloudState(true, true);
+ }
+
+ // load and publish a new CollectionInfo
+ private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
+ IOException {
+
+ // TODO: - incremental update rather than reread everything
+
+ // build immutable CloudInfo
+
+ if(immediate) {
+ if(!onlyLiveNodes) {
+ log.info("Updating cloud state from ZooKeeper... ");
+ } else {
+ log.info("Updating live nodes from ZooKeeper... ");
+ }
+ CloudState cloudState;
+ cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
+ // update volatile
+ this.cloudState = cloudState;
+ } else {
+ if(cloudStateUpdateScheduled) {
+ log.info("Cloud state update for ZooKeeper already scheduled");
+ return;
+ }
+ log.info("Scheduling cloud state update from ZooKeeper...");
+ cloudStateUpdateScheduled = true;
+ updateCloudExecutor.schedule(new Runnable() {
+
+ public void run() {
+ log.info("Updating cloud state from ZooKeeper...");
+ synchronized (getUpdateLock()) {
+ cloudStateUpdateScheduled = false;
+ CloudState cloudState;
+ try {
+ cloudState = CloudState.buildCloudState(zkClient,
+ ZkStateReader.this.cloudState, onlyLiveNodes);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ // update volatile
+ ZkStateReader.this.cloudState = cloudState;
+ }
+ }
+ }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ /**
+ * @return information about the cluster from ZooKeeper
+ */
+ public CloudState getCloudState() {
+ return cloudState;
+ }
+
+ public Object getUpdateLock() {
+ return this;
+ }
+}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=909510&r1=909509&r2=909510&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java Fri Feb 12 17:32:23 2010
@@ -429,9 +429,9 @@
if(zkController != null) {
try {
- synchronized (zkController) {
+ synchronized (zkController.getZkStateReader().getUpdateLock()) {
zkController.addShardZkNodeWatches();
- zkController.updateCloudState(true);
+ zkController.getZkStateReader().updateCloudState(true);
}
} catch (InterruptedException e) {
// Restore the interrupted status