You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/08/04 23:26:15 UTC
svn commit: r1369470 [23/23] - in /lucene/dev/branches/pforcodec_3892: ./
dev-tools/ dev-tools/eclipse/ dev-tools/maven/ dev-tools/scripts/ lucene/
lucene/analysis/ lucene/analysis/common/
lucene/analysis/common/src/java/org/tartarus/snowball/ext/ luce...
Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Sat Aug 4 21:26:10 2012
@@ -47,6 +47,8 @@ class ConnectionManager implements Watch
private OnReconnect onReconnect;
+ private volatile boolean isClosed = false;
+
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
this.name = name;
this.client = client;
@@ -68,6 +70,8 @@ class ConnectionManager implements Watch
log.info("Watcher " + this + " name:" + name + " got event " + event
+ " path:" + event.getPath() + " type:" + event.getType());
}
+
+ checkClosed();
state = event.getState();
if (state == KeeperState.SyncConnected) {
@@ -81,11 +85,18 @@ class ConnectionManager implements Watch
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
- public void update(SolrZooKeeper keeper)
- throws InterruptedException, TimeoutException {
+ public void update(SolrZooKeeper keeper) throws TimeoutException {
synchronized (connectionStrategy) {
- waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
- client.updateKeeper(keeper);
+ checkClosed();
+ try {
+ waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+ checkClosed();
+ client.updateKeeper(keeper);
+ } catch (InterruptedException e) {
+ // we must have been asked to stop
+ throw new RuntimeException("Giving up on connecting - we were interrupted");
+ }
+ checkClosed();
if (onReconnect != null) {
onReconnect.command();
}
@@ -95,6 +106,7 @@ class ConnectionManager implements Watch
}
}
+
});
} catch (Exception e) {
SolrException.log(log, "", e);
@@ -109,7 +121,13 @@ class ConnectionManager implements Watch
}
public synchronized boolean isConnected() {
- return connected;
+ return !isClosed && connected;
+ }
+
+ // we use a volatile rather than sync
+ // to avoid deadlock on shutdown
+ public void close() {
+ this.isClosed = true;
}
public synchronized KeeperState state() {
@@ -122,12 +140,20 @@ class ConnectionManager implements Watch
long left = waitForConnection;
while (!connected && left > 0) {
wait(left);
+ checkClosed();
left = expire - System.currentTimeMillis();
}
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
}
+
+ private synchronized void checkClosed() {
+ if (isClosed) {
+ log.info("Not acting because I am closed");
+ return;
+ }
+ }
public synchronized void waitForDisconnected(long timeout)
throws InterruptedException, TimeoutException {
Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sat Aug 4 21:26:10 2012
@@ -654,7 +654,11 @@ public class SolrZkClient {
public void close() throws InterruptedException {
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
- keeper.close();
+ try {
+ keeper.close();
+ } finally {
+ connManager.close();
+ }
numCloses.incrementAndGet();
}
Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sat Aug 4 21:26:10 2012
@@ -68,7 +68,7 @@ public class ZkStateReader {
public static final String DOWN = "down";
public static final String SYNC = "sync";
- private volatile CloudState cloudState;
+ private volatile ClusterState clusterState;
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
@@ -120,7 +120,7 @@ public class ZkStateReader {
}
private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1, new ZKTF());
- private boolean cloudStateUpdateScheduled;
+ private boolean clusterStateUpdateScheduled;
private SolrZkClient zkClient;
@@ -158,13 +158,13 @@ public class ZkStateReader {
}
// load and publish a new CollectionInfo
- public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException {
- updateCloudState(immediate, false);
+ public void updateClusterState(boolean immediate) throws KeeperException, InterruptedException {
+ updateClusterState(immediate, false);
}
// load and publish a new CollectionInfo
public void updateLiveNodes() throws KeeperException, InterruptedException {
- updateCloudState(true, true);
+ updateClusterState(true, true);
}
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
@@ -185,21 +185,21 @@ public class ZkStateReader {
if (EventType.None.equals(event.getType())) {
return;
}
- log.info("A cluster state change has occurred");
+ log.info("A cluster state change has occurred - updating...");
try {
// delayed approach
- // ZkStateReader.this.updateCloudState(false, false);
+ // ZkStateReader.this.updateClusterState(false, false);
synchronized (ZkStateReader.this.getUpdateLock()) {
// remake watch
final Watcher thisWatch = this;
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null,
true);
- CloudState clusterState = CloudState.load(data,
- ZkStateReader.this.cloudState.getLiveNodes());
+ ClusterState clusterState = ClusterState.load(data,
+ ZkStateReader.this.clusterState.getLiveNodes());
// update volatile
- cloudState = clusterState;
+ ZkStateReader.this.clusterState = clusterState;
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -236,15 +236,15 @@ public class ZkStateReader {
log.info("Updating live nodes");
try {
// delayed approach
- // ZkStateReader.this.updateCloudState(false, true);
+ // ZkStateReader.this.updateClusterState(false, true);
synchronized (ZkStateReader.this.getUpdateLock()) {
List<String> liveNodes = zkClient.getChildren(
LIVE_NODES_ZKNODE, this, true);
Set<String> liveNodesSet = new HashSet<String>();
liveNodesSet.addAll(liveNodes);
- CloudState clusterState = new CloudState(liveNodesSet,
- ZkStateReader.this.cloudState.getCollectionStates());
- ZkStateReader.this.cloudState = clusterState;
+ ClusterState clusterState = new ClusterState(liveNodesSet,
+ ZkStateReader.this.clusterState.getCollectionStates());
+ ZkStateReader.this.clusterState = clusterState;
}
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -267,51 +267,52 @@ public class ZkStateReader {
Set<String> liveNodeSet = new HashSet<String>();
liveNodeSet.addAll(liveNodes);
- CloudState clusterState = CloudState.load(zkClient, liveNodeSet);
- this.cloudState = clusterState;
+ ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet);
+ this.clusterState = clusterState;
}
}
// load and publish a new CollectionInfo
- private synchronized void updateCloudState(boolean immediate,
+ private synchronized void updateClusterState(boolean immediate,
final boolean onlyLiveNodes) throws KeeperException,
InterruptedException {
// build immutable CloudInfo
if (immediate) {
- CloudState clusterState;
+ ClusterState clusterState;
synchronized (getUpdateLock()) {
- List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, true);
- Set<String> liveNodesSet = new HashSet<String>();
- liveNodesSet.addAll(liveNodes);
-
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null,
+ true);
+ Set<String> liveNodesSet = new HashSet<String>();
+ liveNodesSet.addAll(liveNodes);
+
if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
- clusterState = CloudState.load(zkClient, liveNodesSet);
+ clusterState = ClusterState.load(zkClient, liveNodesSet);
} else {
log.info("Updating live nodes from ZooKeeper... ");
- clusterState = new CloudState(liveNodesSet,
- ZkStateReader.this.cloudState.getCollectionStates());
+ clusterState = new ClusterState(liveNodesSet,
+ ZkStateReader.this.clusterState.getCollectionStates());
}
}
- this.cloudState = clusterState;
+ this.clusterState = clusterState;
} else {
- if (cloudStateUpdateScheduled) {
+ if (clusterStateUpdateScheduled) {
log.info("Cloud state update for ZooKeeper already scheduled");
return;
}
log.info("Scheduling cloud state update from ZooKeeper...");
- cloudStateUpdateScheduled = true;
+ clusterStateUpdateScheduled = true;
updateCloudExecutor.schedule(new Runnable() {
public void run() {
log.info("Updating cluster state from ZooKeeper...");
synchronized (getUpdateLock()) {
- cloudStateUpdateScheduled = false;
- CloudState clusterState;
+ clusterStateUpdateScheduled = false;
+ ClusterState clusterState;
try {
List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
null, true);
@@ -321,13 +322,13 @@ public class ZkStateReader {
if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
- clusterState = CloudState.load(zkClient, liveNodesSet);
+ clusterState = ClusterState.load(zkClient, liveNodesSet);
} else {
log.info("Updating live nodes from ZooKeeper... ");
- clusterState = new CloudState(liveNodesSet, ZkStateReader.this.cloudState.getCollectionStates());
+ clusterState = new ClusterState(liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates());
}
- ZkStateReader.this.cloudState = clusterState;
+ ZkStateReader.this.clusterState = clusterState;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -346,7 +347,7 @@ public class ZkStateReader {
SolrException.ErrorCode.SERVER_ERROR, "", e);
}
// update volatile
- ZkStateReader.this.cloudState = cloudState;
+ ZkStateReader.this.clusterState = clusterState;
}
}
}, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
@@ -357,8 +358,8 @@ public class ZkStateReader {
/**
* @return information about the cluster from ZooKeeper
*/
- public CloudState getCloudState() {
- return cloudState;
+ public ClusterState getClusterState() {
+ return clusterState;
}
public Object getUpdateLock() {
@@ -411,9 +412,8 @@ public class ZkStateReader {
public ZkNodeProps getLeaderProps(String collection, String shard, int timeout) throws InterruptedException {
long timeoutAt = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < timeoutAt) {
- if (cloudState != null) {
- final CloudState currentState = cloudState;
- final ZkNodeProps nodeProps = currentState.getLeader(collection, shard);
+ if (clusterState != null) {
+ final ZkNodeProps nodeProps = clusterState.getLeader(collection, shard);
if (nodeProps != null) {
return nodeProps;
}
@@ -452,15 +452,15 @@ public class ZkStateReader {
public List<ZkCoreNodeProps> getReplicaProps(String collection,
String shardId, String thisNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) {
- CloudState cloudState = this.cloudState;
- if (cloudState == null) {
+ ClusterState clusterState = this.clusterState;
+ if (clusterState == null) {
return null;
}
- Map<String,Slice> slices = cloudState.getSlices(collection);
+ Map<String,Slice> slices = clusterState.getSlices(collection);
if (slices == null) {
throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
"Could not find collection in zk: " + collection + " "
- + cloudState.getCollections());
+ + clusterState.getCollections());
}
Slice replicas = slices.get(shardId);
@@ -474,7 +474,7 @@ public class ZkStateReader {
for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
String coreNodeName = nodeProps.getNodeName() + "_" + nodeProps.getCoreName();
- if (cloudState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName)) {
+ if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName)) {
if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) {
if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) {
nodes.add(nodeProps);
Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java Sat Aug 4 21:26:10 2012
@@ -291,4 +291,132 @@ public class Hash {
return h1;
}
+
+
+ /** Returns the MurmurHash3_x86_32 hash of the UTF-8 bytes of the String without actually encoding
+ * the string to a temporary buffer. This is more than 2x faster than hashing the result
+ * of String.getBytes().
+ */
+ public static int murmurhash3_x86_32(CharSequence data, int offset, int len, int seed) {
+
+ final int c1 = 0xcc9e2d51;
+ final int c2 = 0x1b873593;
+
+ int h1 = seed;
+
+ int pos = offset;
+ int end = offset + len;
+ int k1 = 0;
+ int k2 = 0;
+ int shift = 0;
+ int bits = 0;
+ int nBytes = 0; // length in UTF8 bytes
+
+
+ while (pos < end) {
+ int code = data.charAt(pos++);
+ if (code < 0x80) {
+ k2 = code;
+ bits = 8;
+
+ /***
+ // optimized ascii implementation (currently slower!!! code size?)
+ if (shift == 24) {
+ k1 = k1 | (code << 24);
+
+ k1 *= c1;
+ k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15);
+ k1 *= c2;
+
+ h1 ^= k1;
+ h1 = (h1 << 13) | (h1 >>> 19); // ROTL32(h1,13);
+ h1 = h1*5+0xe6546b64;
+
+ shift = 0;
+ nBytes += 4;
+ k1 = 0;
+ } else {
+ k1 |= code << shift;
+ shift += 8;
+ }
+ continue;
+ ***/
+
+ }
+ else if (code < 0x800) {
+ k2 = (0xC0 | (code >> 6))
+ | ((0x80 | (code & 0x3F)) << 8);
+ bits = 16;
+ }
+ else if (code < 0xD800 || code > 0xDFFF || pos>=end) {
+ // we check for pos>=end to encode an unpaired surrogate as 3 bytes.
+ k2 = (0xE0 | (code >> 12))
+ | ((0x80 | ((code >> 6) & 0x3F)) << 8)
+ | ((0x80 | (code & 0x3F)) << 16);
+ bits = 24;
+ } else {
+ // surrogate pair
+ // int utf32 = pos < end ? (int) data.charAt(pos++) : 0;
+ int utf32 = (int) data.charAt(pos++);
+ utf32 = ((code - 0xD7C0) << 10) + (utf32 & 0x3FF);
+ k2 = (0xff & (0xF0 | (utf32 >> 18)))
+ | ((0x80 | ((utf32 >> 12) & 0x3F))) << 8
+ | ((0x80 | ((utf32 >> 6) & 0x3F))) << 16
+ | (0x80 | (utf32 & 0x3F)) << 24;
+ bits = 32;
+ }
+
+
+ k1 |= k2 << shift;
+
+ // int used_bits = 32 - shift; // how many bits of k2 were used in k1.
+ // int unused_bits = bits - used_bits; // (bits-(32-shift)) == bits+shift-32 == bits-newshift
+
+ shift += bits;
+ if (shift >= 32) {
+ // mix after we have a complete word
+
+ k1 *= c1;
+ k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15);
+ k1 *= c2;
+
+ h1 ^= k1;
+ h1 = (h1 << 13) | (h1 >>> 19); // ROTL32(h1,13);
+ h1 = h1*5+0xe6546b64;
+
+ shift -= 32;
+ // unfortunately, java won't let you shift 32 bits off, so we need to check for 0
+ if (shift != 0) {
+ k1 = k2 >>> (bits-shift); // bits used == bits - newshift
+ } else {
+ k1 = 0;
+ }
+ nBytes += 4;
+ }
+
+ } // inner
+
+ // handle tail
+ if (shift > 0) {
+ nBytes += shift >> 3;
+ k1 *= c1;
+ k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15);
+ k1 *= c2;
+ h1 ^= k1;
+ }
+
+ // finalization
+ h1 ^= nBytes;
+
+ // fmix(h1);
+ h1 ^= h1 >>> 16;
+ h1 *= 0x85ebca6b;
+ h1 ^= h1 >>> 13;
+ h1 *= 0xc2b2ae35;
+ h1 ^= h1 >>> 16;
+
+ return h1;
+ }
+
+
}