You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/05 09:35:30 UTC

git commit: Store more informations in peers table

Updated Branches:
  refs/heads/trunk 8b00f3a25 -> d5ec013ce


Store more informations in peers table

patch by slebresne; reviewed by jbellis for CASSANDRA-4351


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5ec013c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5ec013c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5ec013c

Branch: refs/heads/trunk
Commit: d5ec013cee4f3d923d9618694716a265ab04fe1b
Parents: 8b00f3a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Oct 5 09:34:51 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Oct 5 09:34:51 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |   12 +-
 .../apache/cassandra/cql3/UntypedResultSet.java    |    6 +
 src/java/org/apache/cassandra/db/SystemTable.java  |  134 +++++++--------
 .../apache/cassandra/service/StorageService.java   |   19 ++
 5 files changed, 95 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 868183e..342135f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * Support repairing only the local DC nodes (CASSANDRA-4747)
  * Use rpc_address for binary protocol and change default port (CASSANRA-4751)
  * Fix use of collections in prepared statements (CASSANDRA-4739)
+ * Store more information into peers table (CASSANDRA-4351)
 
 
 1.2-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 6abeb33..ef25d2a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -158,13 +158,19 @@ public final class CFMetaData
                                                          + "AND COMMENT='hints awaiting delivery'");
 
     public static final CFMetaData PeersCf = compile(12, "CREATE TABLE " + SystemTable.PEERS_CF + " ("
-                                                         + "token_bytes blob PRIMARY KEY,"
-                                                         + "peer inet"
+                                                         + "peer inet PRIMARY KEY,"
+                                                         + "ring_id uuid,"
+                                                         + "tokens set<blob>,"
+                                                         + "schema_version uuid,"
+                                                         + "release_version text,"
+                                                         + "rpc_address inet,"
+                                                         + "data_center text,"
+                                                         + "rack text"
                                                          + ") WITH COMMENT='known peers in the cluster'");
 
     public static final CFMetaData LocalCf = compile(13, "CREATE TABLE " + SystemTable.LOCAL_CF + " ("
                                                          + "key text PRIMARY KEY,"
-                                                         + "token_bytes blob,"
+                                                         + "tokens set<blob>,"
                                                          + "cluster_name text,"
                                                          + "gossip_generation int,"
                                                          + "bootstrapped text,"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 203e4c1..ca3acf5 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.collect.AbstractIterator;
@@ -130,6 +131,11 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
             return DateType.instance.compose(data.get(column));
         }
 
+        public <T> Set<T> getSet(String column, AbstractType<T> type)
+        {
+            return SetType.getInstance(type).compose(data.get(column));
+        }
+
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index ead325f..e2ff161 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -25,7 +25,8 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,9 +159,9 @@ public class SystemTable
             }
             // serialize the old token as a collection of (one )tokens.
             Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value());
-            String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(Collections.singleton(token)));
+            String tokenBytes = serializeTokens(Collections.singleton(token));
             // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason)
-            String req = "INSERT INTO system.%s (key, cluster_name, token_bytes, bootstrapped) VALUES ('%s', '%s', '%s', '%s')";
+            String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', '%s', '%s')";
             processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name()));
 
             oldStatusCfs.truncate();
@@ -185,14 +186,43 @@ public class SystemTable
             return;
         }
 
-        IPartitioner p = StorageService.getPartitioner();
-        for (Token token : tokens)
+        String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
+        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), serializeTokens(tokens)));
+        forceBlockingFlush(PEERS_CF);
+    }
+
+    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
+    {
+        if (ep.equals(FBUtilities.getBroadcastAddress()))
+            return;
+
+        String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', '%s')";
+        processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
+    }
+
+    private static String serializeTokens(Collection<Token> tokens)
+    {
+        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+        StringBuilder sb = new StringBuilder();
+        sb.append("{");
+        Iterator<Token> iter = tokens.iterator();
+        while (iter.hasNext())
         {
-            String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')";
-            String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
-            processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress()));
+            sb.append("'").append(ByteBufferUtil.bytesToHex(factory.toByteArray(iter.next()))).append("'");
+            if (iter.hasNext())
+                sb.append(",");
         }
-        forceBlockingFlush(PEERS_CF);
+        sb.append("}");
+        return sb.toString();
+    }
+
+    private static Collection<Token> deserializeTokens(Collection<ByteBuffer> tokensBytes)
+    {
+        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+        List<Token> tokens = new ArrayList<Token>(tokensBytes.size());
+        for (ByteBuffer tk : tokensBytes)
+            tokens.add(factory.fromByteArray(tk));
+        return tokens;
     }
 
     /**
@@ -200,13 +230,15 @@ public class SystemTable
      */
     public static synchronized void removeTokens(Collection<Token> tokens)
     {
-        IPartitioner p = StorageService.getPartitioner();
-
-        for (Token token : tokens)
+        Set<Token> tokenSet = new HashSet<Token>(tokens);
+        for (Map.Entry<InetAddress, Collection<Token>> entry : loadTokens().asMap().entrySet())
         {
-            String req = "DELETE FROM system.%s WHERE token_bytes = '%s'";
-            String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
-            processInternal(String.format(req, PEERS_CF, tokenBytes));
+            Set<Token> toRemove = Sets.intersection(tokenSet, ((Set<Token>)entry.getValue())).immutableCopy();
+            if (toRemove.isEmpty())
+                continue;
+
+            String req = "UPDATE system.%s SET tokens = tokens - %s WHERE peer = '%s'";
+            processInternal(String.format(req, PEERS_CF, serializeTokens(toRemove), entry.getKey()));
         }
         forceBlockingFlush(PEERS_CF);
     }
@@ -216,9 +248,8 @@ public class SystemTable
     */
     public static synchronized void updateTokens(Collection<Token> tokens)
     {
-        String req = "INSERT INTO system.%s (key, token_bytes) VALUES ('%s', '%s')";
-        String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(tokens));
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokenBytes));
+        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
+        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, serializeTokens(tokens)));
         forceBlockingFlush(LOCAL_CF);
     }
 
@@ -238,53 +269,6 @@ public class SystemTable
         return tokens;
     }
 
-    /** Serialize a collection of tokens to bytes */
-    private static ByteBuffer serializeTokens(Collection<Token> tokens)
-    {
-        // Guesstimate the total number of bytes needed
-        int estCapacity = (tokens.size() * 16) + (tokens.size() * 2);
-        ByteBuffer toks = ByteBuffer.allocate(estCapacity);
-        IPartitioner p = StorageService.getPartitioner();
-
-        for (Token token : tokens)
-        {
-            ByteBuffer tokenBytes = p.getTokenFactory().toByteArray(token);
-
-            // If we blow the buffer, grow it by double
-            if (toks.remaining() < (2 + tokenBytes.remaining()))
-            {
-                estCapacity = estCapacity * 2;
-                ByteBuffer newToks = ByteBuffer.allocate(estCapacity);
-                toks.flip();
-                newToks.put(toks);
-                toks = newToks;
-            }
-
-            toks.putShort((short)tokenBytes.remaining());
-            toks.put(tokenBytes);
-        }
-
-        toks.flip();
-        return toks;
-    }
-
-    private static Collection<Token> deserializeTokens(ByteBuffer tokenBytes)
-    {
-        List<Token> tokens = new ArrayList<Token>();
-        IPartitioner p = StorageService.getPartitioner();
-
-        while(tokenBytes.hasRemaining())
-        {
-            short len = tokenBytes.getShort();
-            ByteBuffer dup = tokenBytes.slice();
-            dup.limit(len);
-            tokenBytes.position(tokenBytes.position() + len);
-            tokens.add(p.getTokenFactory().fromByteArray(dup));
-        }
-
-        return tokens;
-    }
-
     private static void forceBlockingFlush(String cfname)
     {
         try
@@ -305,13 +289,15 @@ public class SystemTable
      * Return a map of stored tokens to IP addresses
      *
      */
-    public static Multimap<InetAddress, Token> loadTokens()
+    public static SetMultimap<InetAddress, Token> loadTokens()
     {
-        IPartitioner p = StorageService.getPartitioner();
-
-        Multimap<InetAddress, Token> tokenMap = HashMultimap.create();
-        for (UntypedResultSet.Row row : processInternal("SELECT * FROM system." + PEERS_CF))
-            tokenMap.put(row.getInetAddress("peer"), p.getTokenFactory().fromByteArray(row.getBytes("token_bytes")));
+        SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
+        for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
+        {
+            InetAddress peer = row.getInetAddress("peer");
+            if (row.has("tokens"))
+                tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", BytesType.instance)));
+        }
 
         return tokenMap;
     }
@@ -361,11 +347,11 @@ public class SystemTable
 
     public static Collection<Token> getSavedTokens()
     {
-        String req = "SELECT token_bytes FROM system.%s WHERE key='%s'";
+        String req = "SELECT tokens FROM system.%s WHERE key='%s'";
         UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-        return result.isEmpty() || !result.one().has("token_bytes")
+        return result.isEmpty() || !result.one().has("tokens")
              ? Collections.<Token>emptyList()
-             : deserializeTokens(result.one().getBytes("token_bytes"));
+             : deserializeTokens(result.one().<ByteBuffer>getSet("tokens", BytesType.instance));
     }
 
     public static int incrementAndGetGeneration()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 01b9e80..47c4c92 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1120,6 +1120,25 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                     handleStateMoving(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
                     handleStateRelocating(endpoint, pieces);
+                break;
+            case RELEASE_VERSION:
+                SystemTable.updatePeerInfo(endpoint, "release_version", value.value);
+                break;
+            case DC:
+                SystemTable.updatePeerInfo(endpoint, "data_center", value.value);
+                break;
+            case RACK:
+                SystemTable.updatePeerInfo(endpoint, "rack", value.value);
+                break;
+            case RPC_ADDRESS:
+                SystemTable.updatePeerInfo(endpoint, "rpc_address", value.value);
+                break;
+            case SCHEMA:
+                SystemTable.updatePeerInfo(endpoint, "schema_version", value.value);
+                break;
+            case HOST_ID:
+                SystemTable.updatePeerInfo(endpoint, "ring_id", value.value);
+                break;
         }
     }