You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/05 09:35:30 UTC
git commit: Store more informations in peers table
Updated Branches:
refs/heads/trunk 8b00f3a25 -> d5ec013ce
Store more informations in peers table
patch by slebresne; reviewed by jbellis for CASSANDRA-4351
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5ec013c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5ec013c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5ec013c
Branch: refs/heads/trunk
Commit: d5ec013cee4f3d923d9618694716a265ab04fe1b
Parents: 8b00f3a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Oct 5 09:34:51 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Oct 5 09:34:51 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 12 +-
.../apache/cassandra/cql3/UntypedResultSet.java | 6 +
src/java/org/apache/cassandra/db/SystemTable.java | 134 +++++++--------
.../apache/cassandra/service/StorageService.java | 19 ++
5 files changed, 95 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 868183e..342135f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
* Support repairing only the local DC nodes (CASSANDRA-4747)
* Use rpc_address for binary protocol and change default port (CASSANRA-4751)
* Fix use of collections in prepared statements (CASSANDRA-4739)
+ * Store more information into peers table (CASSANDRA-4351)
1.2-beta1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 6abeb33..ef25d2a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -158,13 +158,19 @@ public final class CFMetaData
+ "AND COMMENT='hints awaiting delivery'");
public static final CFMetaData PeersCf = compile(12, "CREATE TABLE " + SystemTable.PEERS_CF + " ("
- + "token_bytes blob PRIMARY KEY,"
- + "peer inet"
+ + "peer inet PRIMARY KEY,"
+ + "ring_id uuid,"
+ + "tokens set<blob>,"
+ + "schema_version uuid,"
+ + "release_version text,"
+ + "rpc_address inet,"
+ + "data_center text,"
+ + "rack text"
+ ") WITH COMMENT='known peers in the cluster'");
public static final CFMetaData LocalCf = compile(13, "CREATE TABLE " + SystemTable.LOCAL_CF + " ("
+ "key text PRIMARY KEY,"
- + "token_bytes blob,"
+ + "tokens set<blob>,"
+ "cluster_name text,"
+ "gossip_generation int,"
+ "bootstrapped text,"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 203e4c1..ca3acf5 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import com.google.common.collect.AbstractIterator;
@@ -130,6 +131,11 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
return DateType.instance.compose(data.get(column));
}
+ public <T> Set<T> getSet(String column, AbstractType<T> type)
+ {
+ return SetType.getInstance(type).compose(data.get(column));
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index ead325f..e2ff161 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -25,7 +25,8 @@ import java.util.*;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,9 +159,9 @@ public class SystemTable
}
// serialize the old token as a collection of (one )tokens.
Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value());
- String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(Collections.singleton(token)));
+ String tokenBytes = serializeTokens(Collections.singleton(token));
// (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason)
- String req = "INSERT INTO system.%s (key, cluster_name, token_bytes, bootstrapped) VALUES ('%s', '%s', '%s', '%s')";
+ String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', '%s', '%s')";
processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name()));
oldStatusCfs.truncate();
@@ -185,14 +186,43 @@ public class SystemTable
return;
}
- IPartitioner p = StorageService.getPartitioner();
- for (Token token : tokens)
+ String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
+ processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), serializeTokens(tokens)));
+ forceBlockingFlush(PEERS_CF);
+ }
+
+ public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
+ {
+ if (ep.equals(FBUtilities.getBroadcastAddress()))
+ return;
+
+ String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', '%s')";
+ processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
+ }
+
+ private static String serializeTokens(Collection<Token> tokens)
+ {
+ Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ Iterator<Token> iter = tokens.iterator();
+ while (iter.hasNext())
{
- String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')";
- String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
- processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress()));
+ sb.append("'").append(ByteBufferUtil.bytesToHex(factory.toByteArray(iter.next()))).append("'");
+ if (iter.hasNext())
+ sb.append(",");
}
- forceBlockingFlush(PEERS_CF);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private static Collection<Token> deserializeTokens(Collection<ByteBuffer> tokensBytes)
+ {
+ Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ List<Token> tokens = new ArrayList<Token>(tokensBytes.size());
+ for (ByteBuffer tk : tokensBytes)
+ tokens.add(factory.fromByteArray(tk));
+ return tokens;
}
/**
@@ -200,13 +230,15 @@ public class SystemTable
*/
public static synchronized void removeTokens(Collection<Token> tokens)
{
- IPartitioner p = StorageService.getPartitioner();
-
- for (Token token : tokens)
+ Set<Token> tokenSet = new HashSet<Token>(tokens);
+ for (Map.Entry<InetAddress, Collection<Token>> entry : loadTokens().asMap().entrySet())
{
- String req = "DELETE FROM system.%s WHERE token_bytes = '%s'";
- String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
- processInternal(String.format(req, PEERS_CF, tokenBytes));
+ Set<Token> toRemove = Sets.intersection(tokenSet, ((Set<Token>)entry.getValue())).immutableCopy();
+ if (toRemove.isEmpty())
+ continue;
+
+ String req = "UPDATE system.%s SET tokens = tokens - %s WHERE peer = '%s'";
+ processInternal(String.format(req, PEERS_CF, serializeTokens(toRemove), entry.getKey()));
}
forceBlockingFlush(PEERS_CF);
}
@@ -216,9 +248,8 @@ public class SystemTable
*/
public static synchronized void updateTokens(Collection<Token> tokens)
{
- String req = "INSERT INTO system.%s (key, token_bytes) VALUES ('%s', '%s')";
- String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(tokens));
- processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokenBytes));
+ String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, serializeTokens(tokens)));
forceBlockingFlush(LOCAL_CF);
}
@@ -238,53 +269,6 @@ public class SystemTable
return tokens;
}
- /** Serialize a collection of tokens to bytes */
- private static ByteBuffer serializeTokens(Collection<Token> tokens)
- {
- // Guesstimate the total number of bytes needed
- int estCapacity = (tokens.size() * 16) + (tokens.size() * 2);
- ByteBuffer toks = ByteBuffer.allocate(estCapacity);
- IPartitioner p = StorageService.getPartitioner();
-
- for (Token token : tokens)
- {
- ByteBuffer tokenBytes = p.getTokenFactory().toByteArray(token);
-
- // If we blow the buffer, grow it by double
- if (toks.remaining() < (2 + tokenBytes.remaining()))
- {
- estCapacity = estCapacity * 2;
- ByteBuffer newToks = ByteBuffer.allocate(estCapacity);
- toks.flip();
- newToks.put(toks);
- toks = newToks;
- }
-
- toks.putShort((short)tokenBytes.remaining());
- toks.put(tokenBytes);
- }
-
- toks.flip();
- return toks;
- }
-
- private static Collection<Token> deserializeTokens(ByteBuffer tokenBytes)
- {
- List<Token> tokens = new ArrayList<Token>();
- IPartitioner p = StorageService.getPartitioner();
-
- while(tokenBytes.hasRemaining())
- {
- short len = tokenBytes.getShort();
- ByteBuffer dup = tokenBytes.slice();
- dup.limit(len);
- tokenBytes.position(tokenBytes.position() + len);
- tokens.add(p.getTokenFactory().fromByteArray(dup));
- }
-
- return tokens;
- }
-
private static void forceBlockingFlush(String cfname)
{
try
@@ -305,13 +289,15 @@ public class SystemTable
* Return a map of stored tokens to IP addresses
*
*/
- public static Multimap<InetAddress, Token> loadTokens()
+ public static SetMultimap<InetAddress, Token> loadTokens()
{
- IPartitioner p = StorageService.getPartitioner();
-
- Multimap<InetAddress, Token> tokenMap = HashMultimap.create();
- for (UntypedResultSet.Row row : processInternal("SELECT * FROM system." + PEERS_CF))
- tokenMap.put(row.getInetAddress("peer"), p.getTokenFactory().fromByteArray(row.getBytes("token_bytes")));
+ SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
+ for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
+ {
+ InetAddress peer = row.getInetAddress("peer");
+ if (row.has("tokens"))
+ tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", BytesType.instance)));
+ }
return tokenMap;
}
@@ -361,11 +347,11 @@ public class SystemTable
public static Collection<Token> getSavedTokens()
{
- String req = "SELECT token_bytes FROM system.%s WHERE key='%s'";
+ String req = "SELECT tokens FROM system.%s WHERE key='%s'";
UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
- return result.isEmpty() || !result.one().has("token_bytes")
+ return result.isEmpty() || !result.one().has("tokens")
? Collections.<Token>emptyList()
- : deserializeTokens(result.one().getBytes("token_bytes"));
+ : deserializeTokens(result.one().<ByteBuffer>getSet("tokens", BytesType.instance));
}
public static int incrementAndGetGeneration()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 01b9e80..47c4c92 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1120,6 +1120,25 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
handleStateMoving(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
handleStateRelocating(endpoint, pieces);
+ break;
+ case RELEASE_VERSION:
+ SystemTable.updatePeerInfo(endpoint, "release_version", value.value);
+ break;
+ case DC:
+ SystemTable.updatePeerInfo(endpoint, "data_center", value.value);
+ break;
+ case RACK:
+ SystemTable.updatePeerInfo(endpoint, "rack", value.value);
+ break;
+ case RPC_ADDRESS:
+ SystemTable.updatePeerInfo(endpoint, "rpc_address", value.value);
+ break;
+ case SCHEMA:
+ SystemTable.updatePeerInfo(endpoint, "schema_version", value.value);
+ break;
+ case HOST_ID:
+ SystemTable.updatePeerInfo(endpoint, "ring_id", value.value);
+ break;
}
}