You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/08/04 23:26:15 UTC

svn commit: r1369470 [23/23] - in /lucene/dev/branches/pforcodec_3892: ./ dev-tools/ dev-tools/eclipse/ dev-tools/maven/ dev-tools/scripts/ lucene/ lucene/analysis/ lucene/analysis/common/ lucene/analysis/common/src/java/org/tartarus/snowball/ext/ luce...

Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Sat Aug  4 21:26:10 2012
@@ -47,6 +47,8 @@ class ConnectionManager implements Watch
 
   private OnReconnect onReconnect;
 
+  private volatile boolean isClosed = false;
+
   public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
     this.name = name;
     this.client = client;
@@ -68,6 +70,8 @@ class ConnectionManager implements Watch
       log.info("Watcher " + this + " name:" + name + " got event " + event
           + " path:" + event.getPath() + " type:" + event.getType());
     }
+    
+    checkClosed();
 
     state = event.getState();
     if (state == KeeperState.SyncConnected) {
@@ -81,11 +85,18 @@ class ConnectionManager implements Watch
         connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
             new ZkClientConnectionStrategy.ZkUpdate() {
               @Override
-              public void update(SolrZooKeeper keeper)
-                  throws InterruptedException, TimeoutException {
+              public void update(SolrZooKeeper keeper) throws TimeoutException {
                 synchronized (connectionStrategy) {
-                  waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
-                  client.updateKeeper(keeper);
+                  checkClosed();
+                  try {
+                    waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+                    checkClosed();
+                    client.updateKeeper(keeper);
+                  } catch (InterruptedException e) {
+                    // we must have been asked to stop
+                    throw new RuntimeException("Giving up on connecting - we were interrupted");
+                  }
+                  checkClosed();
                   if (onReconnect != null) {
                     onReconnect.command();
                   }
@@ -95,6 +106,7 @@ class ConnectionManager implements Watch
                 }
                 
               }
+
             });
       } catch (Exception e) {
         SolrException.log(log, "", e);
@@ -109,7 +121,13 @@ class ConnectionManager implements Watch
   }
 
   public synchronized boolean isConnected() {
-    return connected;
+    return !isClosed && connected;
+  }
+  
+  // we use a volatile rather than sync
+  // to avoid deadlock on shutdown
+  public void close() {
+    this.isClosed = true;
   }
 
   public synchronized KeeperState state() {
@@ -122,12 +140,20 @@ class ConnectionManager implements Watch
     long left = waitForConnection;
     while (!connected && left > 0) {
       wait(left);
+      checkClosed();
       left = expire - System.currentTimeMillis();
     }
     if (!connected) {
       throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
     }
   }
+  
+  private synchronized void checkClosed() {
+    if (isClosed) {
+      log.info("Not acting because I am closed");
+      return;
+    }
+  }
 
   public synchronized void waitForDisconnected(long timeout)
       throws InterruptedException, TimeoutException {

Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sat Aug  4 21:26:10 2012
@@ -654,7 +654,11 @@ public class SolrZkClient {
   public void close() throws InterruptedException {
     if (isClosed) return; // it's okay if we over close - same as solrcore
     isClosed = true;
-    keeper.close();
+    try {
+      keeper.close();
+    } finally {
+      connManager.close();
+    }
     numCloses.incrementAndGet();
   }
 

Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sat Aug  4 21:26:10 2012
@@ -68,7 +68,7 @@ public class ZkStateReader {
   public static final String DOWN = "down";
   public static final String SYNC = "sync";
   
-  private volatile CloudState cloudState;
+  private volatile ClusterState clusterState;
 
   private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
 
@@ -120,7 +120,7 @@ public class ZkStateReader {
   }
   private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1, new ZKTF());
 
-  private boolean cloudStateUpdateScheduled;
+  private boolean clusterStateUpdateScheduled;
 
   private SolrZkClient zkClient;
   
@@ -158,13 +158,13 @@ public class ZkStateReader {
   }
   
   // load and publish a new CollectionInfo
-  public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException {
-    updateCloudState(immediate, false);
+  public void updateClusterState(boolean immediate) throws KeeperException, InterruptedException {
+    updateClusterState(immediate, false);
   }
   
   // load and publish a new CollectionInfo
   public void updateLiveNodes() throws KeeperException, InterruptedException {
-    updateCloudState(true, true);
+    updateClusterState(true, true);
   }
   
   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
@@ -185,21 +185,21 @@ public class ZkStateReader {
           if (EventType.None.equals(event.getType())) {
             return;
           }
-          log.info("A cluster state change has occurred");
+          log.info("A cluster state change has occurred - updating...");
           try {
             
             // delayed approach
-            // ZkStateReader.this.updateCloudState(false, false);
+            // ZkStateReader.this.updateClusterState(false, false);
             synchronized (ZkStateReader.this.getUpdateLock()) {
               // remake watch
               final Watcher thisWatch = this;
               byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null,
                   true);
               
-              CloudState clusterState = CloudState.load(data,
-                  ZkStateReader.this.cloudState.getLiveNodes());
+              ClusterState clusterState = ClusterState.load(data,
+                  ZkStateReader.this.clusterState.getLiveNodes());
               // update volatile
-              cloudState = clusterState;
+              ZkStateReader.this.clusterState = clusterState;
             }
           } catch (KeeperException e) {
             if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -236,15 +236,15 @@ public class ZkStateReader {
               log.info("Updating live nodes");
               try {
                 // delayed approach
-                // ZkStateReader.this.updateCloudState(false, true);
+                // ZkStateReader.this.updateClusterState(false, true);
                 synchronized (ZkStateReader.this.getUpdateLock()) {
                   List<String> liveNodes = zkClient.getChildren(
                       LIVE_NODES_ZKNODE, this, true);
                   Set<String> liveNodesSet = new HashSet<String>();
                   liveNodesSet.addAll(liveNodes);
-                  CloudState clusterState = new CloudState(liveNodesSet,
-                      ZkStateReader.this.cloudState.getCollectionStates());
-                  ZkStateReader.this.cloudState = clusterState;
+                  ClusterState clusterState = new ClusterState(liveNodesSet,
+                      ZkStateReader.this.clusterState.getCollectionStates());
+                  ZkStateReader.this.clusterState = clusterState;
                 }
               } catch (KeeperException e) {
                 if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -267,51 +267,52 @@ public class ZkStateReader {
     
       Set<String> liveNodeSet = new HashSet<String>();
       liveNodeSet.addAll(liveNodes);
-      CloudState clusterState = CloudState.load(zkClient, liveNodeSet);
-      this.cloudState = clusterState;
+      ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet);
+      this.clusterState = clusterState;
     }
   }
   
   
   // load and publish a new CollectionInfo
-  private synchronized void updateCloudState(boolean immediate,
+  private synchronized void updateClusterState(boolean immediate,
       final boolean onlyLiveNodes) throws KeeperException,
       InterruptedException {
     // build immutable CloudInfo
     
     if (immediate) {
-      CloudState clusterState;
+      ClusterState clusterState;
       synchronized (getUpdateLock()) {
-      List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, true);
-      Set<String> liveNodesSet = new HashSet<String>();
-      liveNodesSet.addAll(liveNodes);
-      
+        List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null,
+            true);
+        Set<String> liveNodesSet = new HashSet<String>();
+        liveNodesSet.addAll(liveNodes);
+        
         if (!onlyLiveNodes) {
           log.info("Updating cloud state from ZooKeeper... ");
           
-          clusterState = CloudState.load(zkClient, liveNodesSet);
+          clusterState = ClusterState.load(zkClient, liveNodesSet);
         } else {
           log.info("Updating live nodes from ZooKeeper... ");
-          clusterState = new CloudState(liveNodesSet,
-              ZkStateReader.this.cloudState.getCollectionStates());
+          clusterState = new ClusterState(liveNodesSet,
+              ZkStateReader.this.clusterState.getCollectionStates());
         }
       }
 
-      this.cloudState = clusterState;
+      this.clusterState = clusterState;
     } else {
-      if (cloudStateUpdateScheduled) {
+      if (clusterStateUpdateScheduled) {
         log.info("Cloud state update for ZooKeeper already scheduled");
         return;
       }
       log.info("Scheduling cloud state update from ZooKeeper...");
-      cloudStateUpdateScheduled = true;
+      clusterStateUpdateScheduled = true;
       updateCloudExecutor.schedule(new Runnable() {
         
         public void run() {
           log.info("Updating cluster state from ZooKeeper...");
           synchronized (getUpdateLock()) {
-            cloudStateUpdateScheduled = false;
-            CloudState clusterState;
+            clusterStateUpdateScheduled = false;
+            ClusterState clusterState;
             try {
               List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
                   null, true);
@@ -321,13 +322,13 @@ public class ZkStateReader {
               if (!onlyLiveNodes) {
                 log.info("Updating cloud state from ZooKeeper... ");
                 
-                clusterState = CloudState.load(zkClient, liveNodesSet);
+                clusterState = ClusterState.load(zkClient, liveNodesSet);
               } else {
                 log.info("Updating live nodes from ZooKeeper... ");
-                clusterState = new CloudState(liveNodesSet, ZkStateReader.this.cloudState.getCollectionStates());
+                clusterState = new ClusterState(liveNodesSet, ZkStateReader.this.clusterState.getCollectionStates());
               }
               
-              ZkStateReader.this.cloudState = clusterState;
+              ZkStateReader.this.clusterState = clusterState;
               
             } catch (KeeperException e) {
               if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -346,7 +347,7 @@ public class ZkStateReader {
                   SolrException.ErrorCode.SERVER_ERROR, "", e);
             } 
             // update volatile
-            ZkStateReader.this.cloudState = cloudState;
+            ZkStateReader.this.clusterState = clusterState;
           }
         }
       }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
@@ -357,8 +358,8 @@ public class ZkStateReader {
   /**
    * @return information about the cluster from ZooKeeper
    */
-  public CloudState getCloudState() {
-    return cloudState;
+  public ClusterState getClusterState() {
+    return clusterState;
   }
   
   public Object getUpdateLock() {
@@ -411,9 +412,8 @@ public class ZkStateReader {
   public ZkNodeProps getLeaderProps(String collection, String shard, int timeout) throws InterruptedException {
     long timeoutAt = System.currentTimeMillis() + timeout;
     while (System.currentTimeMillis() < timeoutAt) {
-      if (cloudState != null) {
-        final CloudState currentState = cloudState;      
-        final ZkNodeProps nodeProps = currentState.getLeader(collection, shard);
+      if (clusterState != null) {    
+        final ZkNodeProps nodeProps = clusterState.getLeader(collection, shard);
         if (nodeProps != null) {
           return nodeProps;
         }
@@ -452,15 +452,15 @@ public class ZkStateReader {
   
   public List<ZkCoreNodeProps> getReplicaProps(String collection,
       String shardId, String thisNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) {
-    CloudState cloudState = this.cloudState;
-    if (cloudState == null) {
+    ClusterState clusterState = this.clusterState;
+    if (clusterState == null) {
       return null;
     }
-    Map<String,Slice> slices = cloudState.getSlices(collection);
+    Map<String,Slice> slices = clusterState.getSlices(collection);
     if (slices == null) {
       throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
           "Could not find collection in zk: " + collection + " "
-              + cloudState.getCollections());
+              + clusterState.getCollections());
     }
     
     Slice replicas = slices.get(shardId);
@@ -474,7 +474,7 @@ public class ZkStateReader {
     for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
       ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
       String coreNodeName = nodeProps.getNodeName() + "_" + nodeProps.getCoreName();
-      if (cloudState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName)) {
+      if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName)) {
         if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) {
           if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) {
             nodes.add(nodeProps);

Modified: lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java?rev=1369470&r1=1369469&r2=1369470&view=diff
==============================================================================
--- lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java (original)
+++ lucene/dev/branches/pforcodec_3892/solr/solrj/src/java/org/apache/solr/common/util/Hash.java Sat Aug  4 21:26:10 2012
@@ -291,4 +291,132 @@ public class Hash {
     return h1;
   }
 
+
+
+  /** Returns the MurmurHash3_x86_32 hash of the UTF-8 bytes of the String without actually encoding
+   * the string to a temporary buffer.  This is more than 2x faster than hashing the result
+   * of String.getBytes().
+   */
+  public static int murmurhash3_x86_32(CharSequence data, int offset, int len, int seed) {
+
+    final int c1 = 0xcc9e2d51;
+    final int c2 = 0x1b873593;
+
+    int h1 = seed;
+
+    int pos = offset;
+    int end = offset + len;
+    int k1 = 0;
+    int k2 = 0;
+    int shift = 0;
+    int bits = 0;
+    int nBytes = 0;   // length in UTF8 bytes
+
+
+    while (pos < end) {
+      int code = data.charAt(pos++);
+      if (code < 0x80) {
+        k2 = code;
+        bits = 8;
+
+        /***
+         // optimized ascii implementation (currently slower!!! code size?)
+         if (shift == 24) {
+         k1 = k1 | (code << 24);
+
+         k1 *= c1;
+         k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
+         k1 *= c2;
+
+         h1 ^= k1;
+         h1 = (h1 << 13) | (h1 >>> 19);  // ROTL32(h1,13);
+         h1 = h1*5+0xe6546b64;
+
+         shift = 0;
+         nBytes += 4;
+         k1 = 0;
+         } else {
+         k1 |= code << shift;
+         shift += 8;
+         }
+         continue;
+         ***/
+
+      }
+      else if (code < 0x800) {
+        k2 = (0xC0 | (code >> 6))
+            | ((0x80 | (code & 0x3F)) << 8);
+        bits = 16;
+      }
+      else if (code < 0xD800 || code > 0xDFFF || pos>=end) {
+        // we check for pos>=end to encode an unpaired surrogate as 3 bytes.
+        k2 = (0xE0 | (code >> 12))
+            | ((0x80 | ((code >> 6) & 0x3F)) << 8)
+            | ((0x80 | (code & 0x3F)) << 16);
+        bits = 24;
+      } else {
+        // surrogate pair
+        // int utf32 = pos < end ? (int) data.charAt(pos++) : 0;
+        int utf32 = (int) data.charAt(pos++);
+        utf32 = ((code - 0xD7C0) << 10) + (utf32 & 0x3FF);
+        k2 = (0xff & (0xF0 | (utf32 >> 18)))
+            | ((0x80 | ((utf32 >> 12) & 0x3F))) << 8
+            | ((0x80 | ((utf32 >> 6) & 0x3F))) << 16
+            |  (0x80 | (utf32 & 0x3F)) << 24;
+        bits = 32;
+      }
+
+
+      k1 |= k2 << shift;
+
+      // int used_bits = 32 - shift;  // how many bits of k2 were used in k1.
+      // int unused_bits = bits - used_bits; //  (bits-(32-shift)) == bits+shift-32  == bits-newshift
+
+      shift += bits;
+      if (shift >= 32) {
+        // mix after we have a complete word
+
+        k1 *= c1;
+        k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
+        k1 *= c2;
+
+        h1 ^= k1;
+        h1 = (h1 << 13) | (h1 >>> 19);  // ROTL32(h1,13);
+        h1 = h1*5+0xe6546b64;
+
+        shift -= 32;
+        // unfortunately, java won't let you shift 32 bits off, so we need to check for 0
+        if (shift != 0) {
+          k1 = k2 >>> (bits-shift);   // bits used == bits - newshift
+        } else {
+          k1 = 0;
+        }
+        nBytes += 4;
+      }
+
+    } // inner
+
+    // handle tail
+    if (shift > 0) {
+      nBytes += shift >> 3;
+      k1 *= c1;
+      k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
+      k1 *= c2;
+      h1 ^= k1;
+    }
+
+    // finalization
+    h1 ^= nBytes;
+
+    // fmix(h1);
+    h1 ^= h1 >>> 16;
+    h1 *= 0x85ebca6b;
+    h1 ^= h1 >>> 13;
+    h1 *= 0xc2b2ae35;
+    h1 ^= h1 >>> 16;
+
+    return h1;
+  }
+
+
 }