You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/01/24 21:05:34 UTC

svn commit: r1062953 - in /cassandra/branches/cassandra-0.7: src/java/org/apache/cassandra/net/ test/unit/org/apache/cassandra/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/migration/ test/unit/org/apache/cassandra/gms/ test/uni...

Author: gdusbabek
Date: Mon Jan 24 20:05:25 2011
New Revision: 1062953

URL: http://svn.apache.org/viewvc?rev=1062953&view=rev
Log:
Serialization tests. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1923

Added:
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java
Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1062953&r1=1062952&r2=1062953&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java Mon Jan 24 20:05:25 2011
@@ -30,14 +30,14 @@ import org.apache.cassandra.utils.FBUtil
 
 public class Message
 {
-    private static MessageSerializer serializer_;
+    private static ICompactSerializer<Message> serializer_;
 
     static
     {
         serializer_ = new MessageSerializer();        
     }
     
-    public static MessageSerializer serializer()
+    public static ICompactSerializer<Message> serializer()
     {
         return serializer_;
     }
@@ -121,25 +121,25 @@ public class Message
         	.append(separator);
         return sbuf.toString();
     }
-}
-
-class MessageSerializer implements ICompactSerializer<Message>
-{
-    public void serialize(Message t, DataOutputStream dos) throws IOException
+    
+    private static class MessageSerializer implements ICompactSerializer<Message>
     {
-        Header.serializer().serialize( t.header_, dos);
-        byte[] bytes = t.getMessageBody();
-        dos.writeInt(bytes.length);
-        dos.write(bytes);
-    }
-
-    public Message deserialize(DataInputStream dis) throws IOException
-    {
-        Header header = Header.serializer().deserialize(dis);
-        int size = dis.readInt();
-        byte[] bytes = new byte[size];
-        dis.readFully(bytes);
-        // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
-        return new Message(header, bytes);
+        public void serialize(Message t, DataOutputStream dos) throws IOException
+        {
+            Header.serializer().serialize( t.header_, dos);
+            byte[] bytes = t.getMessageBody();
+            dos.writeInt(bytes.length);
+            dos.write(bytes);
+        }
+    
+        public Message deserialize(DataInputStream dis) throws IOException
+        {
+            Header header = Header.serializer().deserialize(dis);
+            int size = dis.readInt();
+            byte[] bytes = new byte[size];
+            dis.readFully(bytes);
+            // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+            return new Message(header, bytes);
+        }
     }
 }

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,29 @@
+package org.apache.cassandra;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class AbstractSerializationsTester extends SchemaLoader
+{
+    protected static final String CUR_VER = System.getProperty("cassandra.version", "0.7");
+    
+    protected static final boolean EXECUTE_WRITES = new Boolean(System.getProperty("cassandra.test-serialization-writes", "False")).booleanValue();
+    
+    protected static DataInputStream getInput(String name) throws IOException
+    {
+        File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
+        assert f.exists();
+        return new DataInputStream(new FileInputStream(f));
+    }
+    
+    protected static DataOutputStream getOutput(String name) throws IOException
+    {
+        File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
+        f.getParentFile().mkdirs();
+        return new DataOutputStream(new FileOutputStream(f));
+    }
+}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,315 @@
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+    private void testRangeSliceCommandWrite() throws IOException
+    {
+        ByteBuffer startCol = ByteBuffer.wrap("Start".getBytes());
+        ByteBuffer stopCol = ByteBuffer.wrap("Stop".getBytes());
+        ByteBuffer emptyCol = ByteBuffer.wrap("".getBytes());
+        SlicePredicate namesPred = new SlicePredicate();
+        namesPred.column_names = Statics.NamedCols;
+        SliceRange emptySliceRange = new SliceRange(emptyCol, emptyCol, false, 100); 
+        SliceRange nonEmptySliceRange = new SliceRange(startCol, stopCol, true, 100);
+        SlicePredicate emptyRangePred = new SlicePredicate();
+        emptyRangePred.slice_range = emptySliceRange;
+        SlicePredicate nonEmptyRangePred = new SlicePredicate();
+        nonEmptyRangePred.slice_range = nonEmptySliceRange;
+        IPartitioner part = StorageService.getPartitioner();
+        AbstractBounds bounds = new Range(part.getRandomToken(), part.getRandomToken());
+        
+        Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage();
+        Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage();
+        Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null,  nonEmptyRangePred, bounds, 100).getMessage();
+        Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage();
+        Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage();
+        Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC,  nonEmptyRangePred, bounds, 100).getMessage();
+        
+        DataOutputStream dout = getOutput("db.RangeSliceCommand.bin");
+        
+        Message.serializer().serialize(namesCmd, dout);
+        Message.serializer().serialize(emptyRangeCmd, dout);
+        Message.serializer().serialize(regRangeCmd, dout);
+        Message.serializer().serialize(namesCmdSup, dout);
+        Message.serializer().serialize(emptyRangeCmdSup, dout);
+        Message.serializer().serialize(regRangeCmdSup, dout);
+        dout.close();
+    }
+    
+    @Test
+    public void testRangeSliceCommandRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testRangeSliceCommandWrite();
+        
+        DataInputStream in = getInput("db.RangeSliceCommand.bin");
+        for (int i = 0; i < 6; i++)
+        {
+            Message msg = Message.serializer().deserialize(in);
+            RangeSliceCommand cmd = RangeSliceCommand.read(msg);
+        }
+        in.close();
+    }
+    
+    private void testSliceByNamesReadCommandWrite() throws IOException
+    {
+        SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(Statics.KS, Statics.Key, Statics.StandardPath, Statics.NamedCols);
+        SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(Statics.KS, Statics.Key, Statics.SuperPath, Statics.NamedCols);
+        
+        DataOutputStream out = getOutput("db.SliceByNamesReadCommand.bin");
+        SliceByNamesReadCommand.serializer().serialize(standardCmd, out);
+        SliceByNamesReadCommand.serializer().serialize(superCmd, out);
+        ReadCommand.serializer().serialize(standardCmd, out);
+        ReadCommand.serializer().serialize(superCmd, out);
+        Message.serializer().serialize(standardCmd.makeReadMessage(), out);
+        Message.serializer().serialize(superCmd.makeReadMessage(), out);
+        out.close();
+    }
+    
+    @Test 
+    public void testSliceByNamesReadCommandRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testSliceByNamesReadCommandWrite();
+        
+        DataInputStream in = getInput("db.SliceByNamesReadCommand.bin");
+        assert SliceByNamesReadCommand.serializer().deserialize(in) != null;
+        assert SliceByNamesReadCommand.serializer().deserialize(in) != null;
+        assert ReadCommand.serializer().deserialize(in) != null;
+        assert ReadCommand.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testSliceFromReadCommandWrite() throws IOException
+    {
+        SliceFromReadCommand standardCmd = new SliceFromReadCommand(Statics.KS, Statics.Key, Statics.StandardPath, Statics.Start, Statics.Stop, true, 100);
+        SliceFromReadCommand superCmd = new SliceFromReadCommand(Statics.KS, Statics.Key, Statics.SuperPath, Statics.Start, Statics.Stop, true, 100);
+        DataOutputStream out = getOutput("db.SliceFromReadCommand.bin");
+        SliceFromReadCommand.serializer().serialize(standardCmd, out);
+        SliceFromReadCommand.serializer().serialize(superCmd, out);
+        ReadCommand.serializer().serialize(standardCmd, out);
+        ReadCommand.serializer().serialize(superCmd, out);
+        Message.serializer().serialize(standardCmd.makeReadMessage(), out);
+        Message.serializer().serialize(superCmd.makeReadMessage(), out);
+        out.close();
+    }
+    
+    @Test
+    public void testSliceFromReadCommandRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testSliceFromReadCommandWrite();
+        
+        DataInputStream in = getInput("db.SliceFromReadCommand.bin");
+        assert SliceFromReadCommand.serializer().deserialize(in) != null;
+        assert SliceFromReadCommand.serializer().deserialize(in) != null;
+        assert ReadCommand.serializer().deserialize(in) != null;
+        assert ReadCommand.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testRowWrite() throws IOException
+    {
+        DataOutputStream out = getOutput("db.Row.bin");
+        Row.serializer().serialize(Statics.StandardRow, out);
+        Row.serializer().serialize(Statics.SuperRow, out);
+        Row.serializer().serialize(Statics.NullRow, out);
+        out.close();
+    }
+    
+    @Test
+    public void testRowRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testRowWrite();
+        
+        DataInputStream in = getInput("db.Row.bin");
+        assert Row.serializer().deserialize(in) != null;
+        assert Row.serializer().deserialize(in) != null;
+        assert Row.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void restRowMutationWrite() throws IOException
+    {
+        RowMutation emptyRm = new RowMutation(Statics.KS, Statics.Key);
+        RowMutation standardRowRm = new RowMutation(Statics.KS, Statics.StandardRow);
+        RowMutation superRowRm = new RowMutation(Statics.KS, Statics.SuperRow);
+        RowMutation standardRm = new RowMutation(Statics.KS, Statics.Key);
+        standardRm.add(Statics.StandardCf);
+        RowMutation superRm = new RowMutation(Statics.KS, Statics.Key);
+        superRm.add(Statics.SuperCf);
+        Map<Integer, ColumnFamily> mods = new HashMap<Integer, ColumnFamily>();
+        mods.put(Statics.StandardCf.metadata().cfId, Statics.StandardCf);
+        mods.put(Statics.SuperCf.metadata().cfId, Statics.SuperCf);
+        RowMutation mixedRm = new RowMutation(Statics.KS, Statics.Key, mods);
+        
+        DataOutputStream out = getOutput("db.RowMutation.bin");
+        RowMutation.serializer().serialize(emptyRm, out);
+        RowMutation.serializer().serialize(standardRowRm, out);
+        RowMutation.serializer().serialize(superRowRm, out);
+        RowMutation.serializer().serialize(standardRm, out);
+        RowMutation.serializer().serialize(superRm, out);
+        RowMutation.serializer().serialize(mixedRm, out);
+        Message.serializer().serialize(emptyRm.makeRowMutationMessage(), out);
+        Message.serializer().serialize(standardRowRm.makeRowMutationMessage(), out);
+        Message.serializer().serialize(superRowRm.makeRowMutationMessage(), out);
+        Message.serializer().serialize(standardRm.makeRowMutationMessage(), out);
+        Message.serializer().serialize(superRm.makeRowMutationMessage(), out);
+        Message.serializer().serialize(mixedRm.makeRowMutationMessage(), out);
+        out.close(); 
+    }
+    
+    @Test
+    public void testRowMutationRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            restRowMutationWrite();
+        
+        DataInputStream in = getInput("db.RowMutation.bin");
+        assert RowMutation.serializer().deserialize(in) != null;
+        assert RowMutation.serializer().deserialize(in) != null;
+        assert RowMutation.serializer().deserialize(in) != null;
+        assert RowMutation.serializer().deserialize(in) != null;
+        assert RowMutation.serializer().deserialize(in) != null;
+        assert RowMutation.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    public void testTruncateWrite() throws IOException
+    {
+        Truncation tr = new Truncation(Statics.KS, "Doesn't Really Matter");
+        TruncateResponse aff = new TruncateResponse(Statics.KS, "Doesn't Matter Either", true);
+        TruncateResponse neg = new TruncateResponse(Statics.KS, "Still Doesn't Matter", false);
+        DataOutputStream out = getOutput("db.Truncation.bin");
+        Truncation.serializer().serialize(tr, out);
+        TruncateResponse.serializer().serialize(aff, out);
+        TruncateResponse.serializer().serialize(neg, out);
+        Message.serializer().serialize(tr.makeTruncationMessage(), out);
+        Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), aff), out);
+        Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), neg), out);
+        // todo: notice how CF names weren't validated.
+        out.close();
+    }
+    
+    @Test
+    public void testTruncateRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testTruncateWrite();
+        
+        DataInputStream in = getInput("db.Truncation.bin");
+        assert Truncation.serializer().deserialize(in) != null;
+        assert TruncateResponse.serializer().deserialize(in) != null;
+        assert TruncateResponse.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testWriteResponseWrite() throws IOException
+    {
+        WriteResponse aff = new WriteResponse(Statics.KS, Statics.Key, true);
+        WriteResponse neg = new WriteResponse(Statics.KS, Statics.Key, false);
+        DataOutputStream out = getOutput("db.WriteResponse.bin");
+        WriteResponse.serializer().serialize(aff, out);
+        WriteResponse.serializer().serialize(neg, out);
+        out.close();
+    }
+    
+    @Test
+    public void testWriteResponseRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testWriteResponseWrite();
+        
+        DataInputStream in = getInput("db.WriteResponse.bin");
+        assert WriteResponse.serializer().deserialize(in) != null;
+        assert WriteResponse.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private static ByteBuffer bb(String s) {
+        return ByteBuffer.wrap(s.getBytes());
+    }
+    
+    private static class Statics 
+    {
+        private static final String KS = "Keyspace1";
+        private static final ByteBuffer Key = ByteBuffer.wrap("Key01".getBytes());
+        private static final List<ByteBuffer> NamedCols = new ArrayList<ByteBuffer>() 
+        {{
+            add(ByteBuffer.wrap("AAA".getBytes()));     
+            add(ByteBuffer.wrap("BBB".getBytes()));     
+            add(ByteBuffer.wrap("CCC".getBytes()));     
+        }};
+        private static final ByteBuffer SC = ByteBuffer.wrap("SCName".getBytes());
+        private static final QueryPath StandardPath = new QueryPath("Standard1");
+        private static final QueryPath SuperPath = new QueryPath("Super1", SC);
+        private static final ByteBuffer Start = ByteBuffer.wrap("Start".getBytes());
+        private static final ByteBuffer Stop = ByteBuffer.wrap("Stop".getBytes());
+        
+        private static final ColumnFamily StandardCf = ColumnFamily.create(Statics.KS, "Standard1");
+        private static final ColumnFamily SuperCf = ColumnFamily.create(Statics.KS, "Super1");
+        
+        private static final SuperColumn SuperCol = new SuperColumn(Statics.SC, DatabaseDescriptor.getComparator(Statics.KS, "Super1"))
+        {{
+            addColumn(new Column(bb("aaaa")));
+            addColumn(new Column(bb("bbbb"), bb("bbbbb-value")));
+            addColumn(new Column(bb("cccc"), bb("ccccc-value"), 1000L));
+            addColumn(new DeletedColumn(bb("dddd"), 500, 1000));
+            addColumn(new DeletedColumn(bb("eeee"), bb("eeee-value"), 1001));
+            addColumn(new ExpiringColumn(bb("ffff"), bb("ffff-value"), 2000, 1000));
+            addColumn(new ExpiringColumn(bb("gggg"), bb("gggg-value"), 2001, 1000, 2002));
+        }};
+        
+        private static final Row StandardRow = new Row(Util.dk("key0"), Statics.StandardCf);
+        private static final Row SuperRow = new Row(Util.dk("key1"), Statics.SuperCf);
+        private static final Row NullRow = new Row(Util.dk("key2"), null);
+        
+        static {
+            StandardCf.addColumn(new Column(bb("aaaa")));
+            StandardCf.addColumn(new Column(bb("bbbb"), bb("bbbbb-value")));
+            StandardCf.addColumn(new Column(bb("cccc"), bb("ccccc-value"), 1000L));
+            StandardCf.addColumn(new DeletedColumn(bb("dddd"), 500, 1000));
+            StandardCf.addColumn(new DeletedColumn(bb("eeee"), bb("eeee-value"), 1001));
+            StandardCf.addColumn(new ExpiringColumn(bb("ffff"), bb("ffff-value"), 2000, 1000));
+            StandardCf.addColumn(new ExpiringColumn(bb("gggg"), bb("gggg-value"), 2001, 1000, 2002));
+            
+            SuperCf.addColumn(Statics.SuperCol);
+        }
+    }
+}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,59 @@
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+    private static final int ksCount = 5;
+    
+    private void testWrite() throws IOException, ConfigurationException
+    {
+        for (int i = 0; i < ksCount; i++)
+        {
+            String tableName = "Keyspace" + (i + 1);
+            KSMetaData ksm = DatabaseDescriptor.getKSMetaData(tableName);
+            UUID uuid = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+            DatabaseDescriptor.clearTableDefinition(ksm, uuid);
+            Migration m = new AddKeyspace(ksm);
+            ByteBuffer bytes = m.serialize();
+            
+            DataOutputStream out = getOutput("db.migration." + tableName + ".bin");
+            out.writeUTF(new String(Base64.encodeBase64(bytes.array())));
+            out.close();
+        }
+    }
+    
+    @Test
+    public void testRead() throws IOException, ConfigurationException
+    {
+        if (AbstractSerializationsTester.EXECUTE_WRITES)
+            testWrite();
+        
+        for (int i = 0; i < ksCount; i++)
+        {
+            String tableName = "Keyspace" + (i + 1);
+            DataInputStream in = getInput("db.migration." + tableName + ".bin");
+            byte[] raw = Base64.decodeBase64(in.readUTF().getBytes());
+            org.apache.cassandra.db.migration.avro.Migration obj = new org.apache.cassandra.db.migration.avro.Migration();
+            SerDeUtils.deserializeWithSchema(ByteBuffer.wrap(raw), obj);
+            in.close();
+        }
+    }
+}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,94 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+    private void testEndpointStateWrite() throws IOException 
+    {
+        DataOutputStream out = getOutput("gms.EndpointState.bin");
+        HeartBeatState.serializer().serialize(Statics.HeartbeatSt, out);
+        EndpointState.serializer().serialize(Statics.EndpointSt, out);
+        VersionedValue.serializer.serialize(Statics.vv0, out);
+        VersionedValue.serializer.serialize(Statics.vv1, out);
+        out.close();
+    }
+    
+    @Test
+    public void testEndpointStateRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testEndpointStateWrite();
+        
+        DataInputStream in = getInput("gms.EndpointState.bin");
+        assert HeartBeatState.serializer().deserialize(in) != null;
+        assert EndpointState.serializer().deserialize(in) != null;
+        assert VersionedValue.serializer.deserialize(in) != null;
+        assert VersionedValue.serializer.deserialize(in) != null;
+        in.close();
+    }
+     
+    private void testGossipDigestWrite() throws IOException
+    {
+        Map<InetAddress, EndpointState> states = new HashMap<InetAddress, EndpointState>();
+        states.put(InetAddress.getByName("127.0.0.1"), Statics.EndpointSt);
+        states.put(InetAddress.getByName("127.0.0.2"), Statics.EndpointSt);
+        GossipDigestAckMessage ack = new GossipDigestAckMessage(Statics.Digests, states);
+        GossipDigestAck2Message ack2 = new GossipDigestAck2Message(states);
+        GossipDigestSynMessage syn = new GossipDigestSynMessage("Not a real cluster name", Statics.Digests);
+        
+        DataOutputStream out = getOutput("gms.Gossip.bin");
+        for (GossipDigest gd : Statics.Digests)
+            GossipDigest.serializer().serialize(gd, out);
+        GossipDigestAckMessage.serializer().serialize(ack, out);
+        GossipDigestAck2Message.serializer().serialize(ack2, out);
+        GossipDigestSynMessage.serializer().serialize(syn, out);
+        out.close();
+    }
+    
+    @Test
+    public void testGossipDigestRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testGossipDigestWrite();
+        
+        int count = 0;
+        DataInputStream in = getInput("gms.Gossip.bin");
+        while (count < Statics.Digests.size())
+            assert GossipDigestAck2Message.serializer().deserialize(in) != null;
+        assert GossipDigestAckMessage.serializer().deserialize(in) != null;
+        assert GossipDigestAck2Message.serializer().deserialize(in) != null;
+        assert GossipDigestSynMessage.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private static class Statics
+    {
+        private static HeartBeatState HeartbeatSt = new HeartBeatState(101, 201);
+        private static EndpointState EndpointSt = new EndpointState(HeartbeatSt);
+        private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
+        private static VersionedValue vv0 = vvFact.load(23d);
+        private static VersionedValue vv1 = vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken());
+        private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
+        
+        {
+            HeartbeatSt.updateHeartBeat();
+            EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
+            EndpointSt.addApplicationState(ApplicationState.STATUS, vv1);
+            for (int i = 0; i < 100; i++)
+                Digests.add(new GossipDigest(FBUtilities.getLocalAddress(), 100 + i, 1000 + 2 * i));
+        }
+    }
+}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,80 @@
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+    private void testTreeRequestWrite() throws IOException
+    {
+        DataOutputStream out = getOutput("service.TreeRequest.bin");
+        AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out);
+        Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req), out);
+        out.close();
+    }
+    
+    @Test
+    public void testTreeRequestRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testTreeRequestWrite();
+        
+        DataInputStream in = getInput("service.TreeRequest.bin");
+        assert AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testTreeResponseWrite() throws IOException
+    {
+        AntiEntropyService.Validator v0 = new AntiEntropyService.Validator(Statics.req);
+        IPartitioner part = new RandomPartitioner();
+        MerkleTree mt = new MerkleTree(part, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
+        List<Token> tokens = new ArrayList<Token>();
+        for (int i = 0; i < 10; i++)
+        {
+            Token t = part.getRandomToken();
+            tokens.add(t);
+            mt.split(t);
+        }
+        AntiEntropyService.Validator v1 = new AntiEntropyService.Validator(Statics.req, mt);
+        DataOutputStream out = getOutput("service.TreeResponse.bin");
+        AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v0, out);
+        AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v1, out);
+        Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getLocalAddress(), v0), out);
+        Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getLocalAddress(), v1), out);
+        out.close();
+    }
+    
+    @Test
+    public void testTreeResponseRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testTreeResponseWrite();
+        
+        DataInputStream in = getInput("service.TreeResponse.bin");
+        assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in) != null;
+        assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private static class Statics
+    {
+        private static final AntiEntropyService.CFPair pair = new AntiEntropyService.CFPair("Keyspace1", "Standard1");
+        private static final AntiEntropyService.TreeRequest req = new AntiEntropyService.TreeRequest("sessionId", FBUtilities.getLocalAddress(), pair);
+    }
+}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,183 @@
+package org.apache.cassandra.streaming;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+    private void testPendingFileWrite() throws IOException
+    {
+        // make sure to test serializing null and a pf with no sstable.
+        PendingFile normal = makePendingFile(true, "fake_component", 100);
+        PendingFile noSections = makePendingFile(true, "not_real", 0);
+        PendingFile noSST = makePendingFile(false, "also_fake", 100);
+        
+        DataOutputStream out = getOutput("streaming.PendingFile.bin");
+        PendingFile.serializer().serialize(normal, out);
+        PendingFile.serializer().serialize(noSections, out);
+        PendingFile.serializer().serialize(noSST, out);
+        PendingFile.serializer().serialize(null, out);
+        out.close();
+    }
+    
+    @Test
+    public void testPendingFileRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testPendingFileWrite();
+        
+        DataInputStream in = getInput("streaming.PendingFile.bin");
+        assert PendingFile.serializer().deserialize(in) != null;
+        assert PendingFile.serializer().deserialize(in) != null;
+        assert PendingFile.serializer().deserialize(in) != null;
+        assert PendingFile.serializer().deserialize(in) == null;
+        in.close();
+    }
+    
+    private void testStreamHeaderWrite() throws IOException
+    {
+        StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, "zz", 100));
+        StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, "zz", 100));
+        Collection<PendingFile> files = new ArrayList<PendingFile>();
+        for (int i = 0; i < 50; i++)
+            files.add(makePendingFile(i % 2 == 0, "aa", 100));
+        StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100), files);
+        StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
+        StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100), new ArrayList<PendingFile>());
+        
+        DataOutputStream out = getOutput("streaming.StreamHeader.bin");
+        StreamHeader.serializer().serialize(sh0, out);
+        StreamHeader.serializer().serialize(sh1, out);
+        StreamHeader.serializer().serialize(sh2, out);
+        StreamHeader.serializer().serialize(sh3, out);
+        StreamHeader.serializer().serialize(sh4, out);
+        out.close();
+    }
+    
+    @Test
+    public void testStreamHeaderRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testStreamHeaderWrite();
+        
+        DataInputStream in = getInput("streaming.StreamHeader.bin");
+        assert StreamHeader.serializer().deserialize(in) != null;
+        assert StreamHeader.serializer().deserialize(in) != null;
+        assert StreamHeader.serializer().deserialize(in) != null;
+        assert StreamHeader.serializer().deserialize(in) != null;
+        assert StreamHeader.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testStreamReplyWrite() throws IOException
+    {
+        StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
+        DataOutputStream out = getOutput("streaming.StreamReply.bin");
+        StreamReply.serializer.serialize(rep, out);
+        Message.serializer().serialize(rep.createMessage(), out);
+        out.close();
+    }
+    
+    @Test
+    public void testStreamReplyRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testStreamReplyWrite();
+        
+        DataInputStream in = getInput("streaming.StreamReply.bin");
+        assert StreamReply.serializer.deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private static PendingFile makePendingFile(boolean sst, String comp, int numSecs)
+    {
+        Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), "Keyspace1", "Standard1", 23, false);
+        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        for (int i = 0; i < numSecs; i++)
+            sections.add(new Pair<Long, Long>(new Long(i), new Long(i * i)));
+        return new PendingFile(sst ? makeSSTable() : null, desc, comp, sections);
+    }
+    
+    private void testStreamRequestMessageWrite() throws IOException
+    {
+        Collection<Range> ranges = new ArrayList<Range>();
+        for (int i = 0; i < 5; i++)
+            ranges.add(new Range(new BytesToken(ByteBuffer.wrap(Integer.toString(10*i).getBytes())), new BytesToken(ByteBuffer.wrap(Integer.toString(10*i+5).getBytes()))));
+        StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L);
+        StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, "aa", 100), 124L);
+        StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, "aa", 100), 124L);
+        
+        DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
+        StreamRequestMessage.serializer().serialize(msg0, out);
+        StreamRequestMessage.serializer().serialize(msg1, out);
+        StreamRequestMessage.serializer().serialize(msg2, out);
+        Message.serializer().serialize(msg0.makeMessage(), out);
+        Message.serializer().serialize(msg1.makeMessage(), out);
+        Message.serializer().serialize(msg2.makeMessage(), out);
+        out.close();
+    }
+    
+    @Test
+    public void testStreamRequestMessageRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testStreamRequestMessageWrite();
+        
+        DataInputStream in = getInput("streaming.StreamRequestMessage.bin");
+        assert StreamRequestMessage.serializer().deserialize(in) != null;
+        assert StreamRequestMessage.serializer().deserialize(in) != null;
+        assert StreamRequestMessage.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        assert Message.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private static SSTable makeSSTable()
+    {
+        Table t = Table.open("Keyspace1");
+        for (int i = 0; i < 100; i++)
+        {
+            RowMutation rm = new RowMutation(t.name, ByteBuffer.wrap(Long.toString(System.nanoTime()).getBytes()));
+            rm.add(new QueryPath("Standard1", null, ByteBuffer.wrap("cola".getBytes())), ByteBuffer.wrap("value".getBytes()), 0);
+            try
+            {
+                rm.apply();
+            }
+            catch (IOException ex) 
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+        try
+        {
+            t.getColumnFamilyStore("Standard1").forceBlockingFlush();
+            return t.getColumnFamilyStore("Standard1").getSSTables().iterator().next();
+        }
+        catch (Exception any)
+        {
+            throw new RuntimeException(any);
+        }
+    }
+}

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.service.StorageService;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+    
+    private void testBloomFilterWrite() throws IOException
+    {
+        BloomFilter bf = BloomFilter.getFilter(1000000, 0.0001);
+        for (int i = 0; i < 100; i++)
+            bf.add(StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken()));
+        DataOutputStream out = getOutput("utils.BloomFilter.bin");
+        BloomFilter.serializer().serialize(bf, out);
+        out.close();
+    }
+    
+    @Test
+    public void testBloomFilterRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testBloomFilterWrite();
+        
+        DataInputStream in = getInput("utils.BloomFilter.bin");
+        assert BloomFilter.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testLegacyBloomFilterWrite() throws IOException
+    {
+        LegacyBloomFilter a = LegacyBloomFilter.getFilter(1000000, 1000);
+        LegacyBloomFilter b = LegacyBloomFilter.getFilter(1000000, 0.0001);
+        for (int i = 0; i < 100; i++)
+        {
+            ByteBuffer key = StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken()); 
+            a.add(key);
+            b.add(key);
+        }
+        DataOutputStream out = getOutput("utils.LegacyBloomFilter.bin");
+        LegacyBloomFilter.serializer().serialize(a, out);
+        LegacyBloomFilter.serializer().serialize(b, out);
+        out.close();
+    }
+    
+    @Test
+    public void testLegacyBloomFilterRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testLegacyBloomFilterWrite();
+        
+        DataInputStream in = getInput("utils.LegacyBloomFilter.bin");
+        assert LegacyBloomFilter.serializer().deserialize(in) != null;
+        in.close();
+    }
+    
+    private void testEstimatedHistogramWrite() throws IOException
+    {
+        EstimatedHistogram hist0 = new EstimatedHistogram();
+        EstimatedHistogram hist1 = new EstimatedHistogram(5000);
+        long[] offsets = new long[1000];
+        long[] data = new long[offsets.length + 1];
+        for (int i = 0; i < offsets.length; i++)
+        {
+            offsets[i] = i;
+            data[i] = 10 * i;
+        }
+        data[offsets.length] = 100000;
+        EstimatedHistogram hist2 = new EstimatedHistogram(offsets, data);
+        
+        DataOutputStream out = getOutput("utils.EstimatedHistogram.bin");
+        EstimatedHistogram.serializer.serialize(hist0, out);
+        EstimatedHistogram.serializer.serialize(hist1, out);
+        EstimatedHistogram.serializer.serialize(hist2, out);
+        out.close();
+    }
+    
+    @Test
+    public void testEstimatedHistogramRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testEstimatedHistogramWrite();
+        
+        DataInputStream in = getInput("utils.EstimatedHistogram.bin");
+        assert EstimatedHistogram.serializer.deserialize(in) != null;
+        assert EstimatedHistogram.serializer.deserialize(in) != null;
+        assert EstimatedHistogram.serializer.deserialize(in) != null;
+        in.close();
+    }
+}