You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/01/24 21:05:34 UTC
svn commit: r1062953 - in /cassandra/branches/cassandra-0.7:
src/java/org/apache/cassandra/net/ test/unit/org/apache/cassandra/
test/unit/org/apache/cassandra/db/
test/unit/org/apache/cassandra/db/migration/
test/unit/org/apache/cassandra/gms/ test/uni...
Author: gdusbabek
Date: Mon Jan 24 20:05:25 2011
New Revision: 1062953
URL: http://svn.apache.org/viewvc?rev=1062953&view=rev
Log:
Serialization tests. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1923
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1062953&r1=1062952&r2=1062953&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java Mon Jan 24 20:05:25 2011
@@ -30,14 +30,14 @@ import org.apache.cassandra.utils.FBUtil
public class Message
{
- private static MessageSerializer serializer_;
+ private static ICompactSerializer<Message> serializer_;
static
{
serializer_ = new MessageSerializer();
}
- public static MessageSerializer serializer()
+ public static ICompactSerializer<Message> serializer()
{
return serializer_;
}
@@ -121,25 +121,25 @@ public class Message
.append(separator);
return sbuf.toString();
}
-}
-
-class MessageSerializer implements ICompactSerializer<Message>
-{
- public void serialize(Message t, DataOutputStream dos) throws IOException
+
+ private static class MessageSerializer implements ICompactSerializer<Message>
{
- Header.serializer().serialize( t.header_, dos);
- byte[] bytes = t.getMessageBody();
- dos.writeInt(bytes.length);
- dos.write(bytes);
- }
-
- public Message deserialize(DataInputStream dis) throws IOException
- {
- Header header = Header.serializer().deserialize(dis);
- int size = dis.readInt();
- byte[] bytes = new byte[size];
- dis.readFully(bytes);
- // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
- return new Message(header, bytes);
+ public void serialize(Message t, DataOutputStream dos) throws IOException
+ {
+ Header.serializer().serialize( t.header_, dos);
+ byte[] bytes = t.getMessageBody();
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ }
+
+ public Message deserialize(DataInputStream dis) throws IOException
+ {
+ Header header = Header.serializer().deserialize(dis);
+ int size = dis.readInt();
+ byte[] bytes = new byte[size];
+ dis.readFully(bytes);
+ // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+ return new Message(header, bytes);
+ }
}
}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/AbstractSerializationsTester.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,29 @@
+package org.apache.cassandra;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class AbstractSerializationsTester extends SchemaLoader
+{
+ protected static final String CUR_VER = System.getProperty("cassandra.version", "0.7");
+
+ protected static final boolean EXECUTE_WRITES = new Boolean(System.getProperty("cassandra.test-serialization-writes", "False")).booleanValue();
+
+ protected static DataInputStream getInput(String name) throws IOException
+ {
+ File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
+ assert f.exists();
+ return new DataInputStream(new FileInputStream(f));
+ }
+
+ protected static DataOutputStream getOutput(String name) throws IOException
+ {
+ File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
+ f.getParentFile().mkdirs();
+ return new DataOutputStream(new FileOutputStream(f));
+ }
+}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,315 @@
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+ private void testRangeSliceCommandWrite() throws IOException
+ {
+ ByteBuffer startCol = ByteBuffer.wrap("Start".getBytes());
+ ByteBuffer stopCol = ByteBuffer.wrap("Stop".getBytes());
+ ByteBuffer emptyCol = ByteBuffer.wrap("".getBytes());
+ SlicePredicate namesPred = new SlicePredicate();
+ namesPred.column_names = Statics.NamedCols;
+ SliceRange emptySliceRange = new SliceRange(emptyCol, emptyCol, false, 100);
+ SliceRange nonEmptySliceRange = new SliceRange(startCol, stopCol, true, 100);
+ SlicePredicate emptyRangePred = new SlicePredicate();
+ emptyRangePred.slice_range = emptySliceRange;
+ SlicePredicate nonEmptyRangePred = new SlicePredicate();
+ nonEmptyRangePred.slice_range = nonEmptySliceRange;
+ IPartitioner part = StorageService.getPartitioner();
+ AbstractBounds bounds = new Range(part.getRandomToken(), part.getRandomToken());
+
+ Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage();
+ Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage();
+ Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, nonEmptyRangePred, bounds, 100).getMessage();
+ Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage();
+ Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage();
+ Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, nonEmptyRangePred, bounds, 100).getMessage();
+
+ DataOutputStream dout = getOutput("db.RangeSliceCommand.bin");
+
+ Message.serializer().serialize(namesCmd, dout);
+ Message.serializer().serialize(emptyRangeCmd, dout);
+ Message.serializer().serialize(regRangeCmd, dout);
+ Message.serializer().serialize(namesCmdSup, dout);
+ Message.serializer().serialize(emptyRangeCmdSup, dout);
+ Message.serializer().serialize(regRangeCmdSup, dout);
+ dout.close();
+ }
+
+ @Test
+ public void testRangeSliceCommandRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testRangeSliceCommandWrite();
+
+ DataInputStream in = getInput("db.RangeSliceCommand.bin");
+ for (int i = 0; i < 6; i++)
+ {
+ Message msg = Message.serializer().deserialize(in);
+ RangeSliceCommand cmd = RangeSliceCommand.read(msg);
+ }
+ in.close();
+ }
+
+ private void testSliceByNamesReadCommandWrite() throws IOException
+ {
+ SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(Statics.KS, Statics.Key, Statics.StandardPath, Statics.NamedCols);
+ SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(Statics.KS, Statics.Key, Statics.SuperPath, Statics.NamedCols);
+
+ DataOutputStream out = getOutput("db.SliceByNamesReadCommand.bin");
+ SliceByNamesReadCommand.serializer().serialize(standardCmd, out);
+ SliceByNamesReadCommand.serializer().serialize(superCmd, out);
+ ReadCommand.serializer().serialize(standardCmd, out);
+ ReadCommand.serializer().serialize(superCmd, out);
+ Message.serializer().serialize(standardCmd.makeReadMessage(), out);
+ Message.serializer().serialize(superCmd.makeReadMessage(), out);
+ out.close();
+ }
+
+ @Test
+ public void testSliceByNamesReadCommandRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testSliceByNamesReadCommandWrite();
+
+ DataInputStream in = getInput("db.SliceByNamesReadCommand.bin");
+ assert SliceByNamesReadCommand.serializer().deserialize(in) != null;
+ assert SliceByNamesReadCommand.serializer().deserialize(in) != null;
+ assert ReadCommand.serializer().deserialize(in) != null;
+ assert ReadCommand.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testSliceFromReadCommandWrite() throws IOException
+ {
+ SliceFromReadCommand standardCmd = new SliceFromReadCommand(Statics.KS, Statics.Key, Statics.StandardPath, Statics.Start, Statics.Stop, true, 100);
+ SliceFromReadCommand superCmd = new SliceFromReadCommand(Statics.KS, Statics.Key, Statics.SuperPath, Statics.Start, Statics.Stop, true, 100);
+ DataOutputStream out = getOutput("db.SliceFromReadCommand.bin");
+ SliceFromReadCommand.serializer().serialize(standardCmd, out);
+ SliceFromReadCommand.serializer().serialize(superCmd, out);
+ ReadCommand.serializer().serialize(standardCmd, out);
+ ReadCommand.serializer().serialize(superCmd, out);
+ Message.serializer().serialize(standardCmd.makeReadMessage(), out);
+ Message.serializer().serialize(superCmd.makeReadMessage(), out);
+ out.close();
+ }
+
+ @Test
+ public void testSliceFromReadCommandRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testSliceFromReadCommandWrite();
+
+ DataInputStream in = getInput("db.SliceFromReadCommand.bin");
+ assert SliceFromReadCommand.serializer().deserialize(in) != null;
+ assert SliceFromReadCommand.serializer().deserialize(in) != null;
+ assert ReadCommand.serializer().deserialize(in) != null;
+ assert ReadCommand.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testRowWrite() throws IOException
+ {
+ DataOutputStream out = getOutput("db.Row.bin");
+ Row.serializer().serialize(Statics.StandardRow, out);
+ Row.serializer().serialize(Statics.SuperRow, out);
+ Row.serializer().serialize(Statics.NullRow, out);
+ out.close();
+ }
+
+ @Test
+ public void testRowRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testRowWrite();
+
+ DataInputStream in = getInput("db.Row.bin");
+ assert Row.serializer().deserialize(in) != null;
+ assert Row.serializer().deserialize(in) != null;
+ assert Row.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void restRowMutationWrite() throws IOException
+ {
+ RowMutation emptyRm = new RowMutation(Statics.KS, Statics.Key);
+ RowMutation standardRowRm = new RowMutation(Statics.KS, Statics.StandardRow);
+ RowMutation superRowRm = new RowMutation(Statics.KS, Statics.SuperRow);
+ RowMutation standardRm = new RowMutation(Statics.KS, Statics.Key);
+ standardRm.add(Statics.StandardCf);
+ RowMutation superRm = new RowMutation(Statics.KS, Statics.Key);
+ superRm.add(Statics.SuperCf);
+ Map<Integer, ColumnFamily> mods = new HashMap<Integer, ColumnFamily>();
+ mods.put(Statics.StandardCf.metadata().cfId, Statics.StandardCf);
+ mods.put(Statics.SuperCf.metadata().cfId, Statics.SuperCf);
+ RowMutation mixedRm = new RowMutation(Statics.KS, Statics.Key, mods);
+
+ DataOutputStream out = getOutput("db.RowMutation.bin");
+ RowMutation.serializer().serialize(emptyRm, out);
+ RowMutation.serializer().serialize(standardRowRm, out);
+ RowMutation.serializer().serialize(superRowRm, out);
+ RowMutation.serializer().serialize(standardRm, out);
+ RowMutation.serializer().serialize(superRm, out);
+ RowMutation.serializer().serialize(mixedRm, out);
+ Message.serializer().serialize(emptyRm.makeRowMutationMessage(), out);
+ Message.serializer().serialize(standardRowRm.makeRowMutationMessage(), out);
+ Message.serializer().serialize(superRowRm.makeRowMutationMessage(), out);
+ Message.serializer().serialize(standardRm.makeRowMutationMessage(), out);
+ Message.serializer().serialize(superRm.makeRowMutationMessage(), out);
+ Message.serializer().serialize(mixedRm.makeRowMutationMessage(), out);
+ out.close();
+ }
+
+ @Test
+ public void testRowMutationRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ restRowMutationWrite();
+
+ DataInputStream in = getInput("db.RowMutation.bin");
+ assert RowMutation.serializer().deserialize(in) != null;
+ assert RowMutation.serializer().deserialize(in) != null;
+ assert RowMutation.serializer().deserialize(in) != null;
+ assert RowMutation.serializer().deserialize(in) != null;
+ assert RowMutation.serializer().deserialize(in) != null;
+ assert RowMutation.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ public void testTruncateWrite() throws IOException
+ {
+ Truncation tr = new Truncation(Statics.KS, "Doesn't Really Matter");
+ TruncateResponse aff = new TruncateResponse(Statics.KS, "Doesn't Matter Either", true);
+ TruncateResponse neg = new TruncateResponse(Statics.KS, "Still Doesn't Matter", false);
+ DataOutputStream out = getOutput("db.Truncation.bin");
+ Truncation.serializer().serialize(tr, out);
+ TruncateResponse.serializer().serialize(aff, out);
+ TruncateResponse.serializer().serialize(neg, out);
+ Message.serializer().serialize(tr.makeTruncationMessage(), out);
+ Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), aff), out);
+ Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), neg), out);
+ // todo: notice how CF names weren't validated.
+ out.close();
+ }
+
+ @Test
+ public void testTruncateRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testTruncateWrite();
+
+ DataInputStream in = getInput("db.Truncation.bin");
+ assert Truncation.serializer().deserialize(in) != null;
+ assert TruncateResponse.serializer().deserialize(in) != null;
+ assert TruncateResponse.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testWriteResponseWrite() throws IOException
+ {
+ WriteResponse aff = new WriteResponse(Statics.KS, Statics.Key, true);
+ WriteResponse neg = new WriteResponse(Statics.KS, Statics.Key, false);
+ DataOutputStream out = getOutput("db.WriteResponse.bin");
+ WriteResponse.serializer().serialize(aff, out);
+ WriteResponse.serializer().serialize(neg, out);
+ out.close();
+ }
+
+ @Test
+ public void testWriteResponseRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testWriteResponseWrite();
+
+ DataInputStream in = getInput("db.WriteResponse.bin");
+ assert WriteResponse.serializer().deserialize(in) != null;
+ assert WriteResponse.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private static ByteBuffer bb(String s) {
+ return ByteBuffer.wrap(s.getBytes());
+ }
+
+ private static class Statics
+ {
+ private static final String KS = "Keyspace1";
+ private static final ByteBuffer Key = ByteBuffer.wrap("Key01".getBytes());
+ private static final List<ByteBuffer> NamedCols = new ArrayList<ByteBuffer>()
+ {{
+ add(ByteBuffer.wrap("AAA".getBytes()));
+ add(ByteBuffer.wrap("BBB".getBytes()));
+ add(ByteBuffer.wrap("CCC".getBytes()));
+ }};
+ private static final ByteBuffer SC = ByteBuffer.wrap("SCName".getBytes());
+ private static final QueryPath StandardPath = new QueryPath("Standard1");
+ private static final QueryPath SuperPath = new QueryPath("Super1", SC);
+ private static final ByteBuffer Start = ByteBuffer.wrap("Start".getBytes());
+ private static final ByteBuffer Stop = ByteBuffer.wrap("Stop".getBytes());
+
+ private static final ColumnFamily StandardCf = ColumnFamily.create(Statics.KS, "Standard1");
+ private static final ColumnFamily SuperCf = ColumnFamily.create(Statics.KS, "Super1");
+
+ private static final SuperColumn SuperCol = new SuperColumn(Statics.SC, DatabaseDescriptor.getComparator(Statics.KS, "Super1"))
+ {{
+ addColumn(new Column(bb("aaaa")));
+ addColumn(new Column(bb("bbbb"), bb("bbbbb-value")));
+ addColumn(new Column(bb("cccc"), bb("ccccc-value"), 1000L));
+ addColumn(new DeletedColumn(bb("dddd"), 500, 1000));
+ addColumn(new DeletedColumn(bb("eeee"), bb("eeee-value"), 1001));
+ addColumn(new ExpiringColumn(bb("ffff"), bb("ffff-value"), 2000, 1000));
+ addColumn(new ExpiringColumn(bb("gggg"), bb("gggg-value"), 2001, 1000, 2002));
+ }};
+
+ private static final Row StandardRow = new Row(Util.dk("key0"), Statics.StandardCf);
+ private static final Row SuperRow = new Row(Util.dk("key1"), Statics.SuperCf);
+ private static final Row NullRow = new Row(Util.dk("key2"), null);
+
+ static {
+ StandardCf.addColumn(new Column(bb("aaaa")));
+ StandardCf.addColumn(new Column(bb("bbbb"), bb("bbbbb-value")));
+ StandardCf.addColumn(new Column(bb("cccc"), bb("ccccc-value"), 1000L));
+ StandardCf.addColumn(new DeletedColumn(bb("dddd"), 500, 1000));
+ StandardCf.addColumn(new DeletedColumn(bb("eeee"), bb("eeee-value"), 1001));
+ StandardCf.addColumn(new ExpiringColumn(bb("ffff"), bb("ffff-value"), 2000, 1000));
+ StandardCf.addColumn(new ExpiringColumn(bb("gggg"), bb("gggg-value"), 2001, 1000, 2002));
+
+ SuperCf.addColumn(Statics.SuperCol);
+ }
+ }
+}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/migration/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,59 @@
+package org.apache.cassandra.db.migration;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+ private static final int ksCount = 5;
+
+ private void testWrite() throws IOException, ConfigurationException
+ {
+ for (int i = 0; i < ksCount; i++)
+ {
+ String tableName = "Keyspace" + (i + 1);
+ KSMetaData ksm = DatabaseDescriptor.getKSMetaData(tableName);
+ UUID uuid = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+ DatabaseDescriptor.clearTableDefinition(ksm, uuid);
+ Migration m = new AddKeyspace(ksm);
+ ByteBuffer bytes = m.serialize();
+
+ DataOutputStream out = getOutput("db.migration." + tableName + ".bin");
+ out.writeUTF(new String(Base64.encodeBase64(bytes.array())));
+ out.close();
+ }
+ }
+
+ @Test
+ public void testRead() throws IOException, ConfigurationException
+ {
+ if (AbstractSerializationsTester.EXECUTE_WRITES)
+ testWrite();
+
+ for (int i = 0; i < ksCount; i++)
+ {
+ String tableName = "Keyspace" + (i + 1);
+ DataInputStream in = getInput("db.migration." + tableName + ".bin");
+ byte[] raw = Base64.decodeBase64(in.readUTF().getBytes());
+ org.apache.cassandra.db.migration.avro.Migration obj = new org.apache.cassandra.db.migration.avro.Migration();
+ SerDeUtils.deserializeWithSchema(ByteBuffer.wrap(raw), obj);
+ in.close();
+ }
+ }
+}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/gms/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,94 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+ private void testEndpointStateWrite() throws IOException
+ {
+ DataOutputStream out = getOutput("gms.EndpointState.bin");
+ HeartBeatState.serializer().serialize(Statics.HeartbeatSt, out);
+ EndpointState.serializer().serialize(Statics.EndpointSt, out);
+ VersionedValue.serializer.serialize(Statics.vv0, out);
+ VersionedValue.serializer.serialize(Statics.vv1, out);
+ out.close();
+ }
+
+ @Test
+ public void testEndpointStateRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testEndpointStateWrite();
+
+ DataInputStream in = getInput("gms.EndpointState.bin");
+ assert HeartBeatState.serializer().deserialize(in) != null;
+ assert EndpointState.serializer().deserialize(in) != null;
+ assert VersionedValue.serializer.deserialize(in) != null;
+ assert VersionedValue.serializer.deserialize(in) != null;
+ in.close();
+ }
+
+ private void testGossipDigestWrite() throws IOException
+ {
+ Map<InetAddress, EndpointState> states = new HashMap<InetAddress, EndpointState>();
+ states.put(InetAddress.getByName("127.0.0.1"), Statics.EndpointSt);
+ states.put(InetAddress.getByName("127.0.0.2"), Statics.EndpointSt);
+ GossipDigestAckMessage ack = new GossipDigestAckMessage(Statics.Digests, states);
+ GossipDigestAck2Message ack2 = new GossipDigestAck2Message(states);
+ GossipDigestSynMessage syn = new GossipDigestSynMessage("Not a real cluster name", Statics.Digests);
+
+ DataOutputStream out = getOutput("gms.Gossip.bin");
+ for (GossipDigest gd : Statics.Digests)
+ GossipDigest.serializer().serialize(gd, out);
+ GossipDigestAckMessage.serializer().serialize(ack, out);
+ GossipDigestAck2Message.serializer().serialize(ack2, out);
+ GossipDigestSynMessage.serializer().serialize(syn, out);
+ out.close();
+ }
+
+ @Test
+ public void testGossipDigestRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testGossipDigestWrite();
+
+ int count = 0;
+ DataInputStream in = getInput("gms.Gossip.bin");
+ while (count < Statics.Digests.size())
+ assert GossipDigestAck2Message.serializer().deserialize(in) != null;
+ assert GossipDigestAckMessage.serializer().deserialize(in) != null;
+ assert GossipDigestAck2Message.serializer().deserialize(in) != null;
+ assert GossipDigestSynMessage.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private static class Statics
+ {
+ private static HeartBeatState HeartbeatSt = new HeartBeatState(101, 201);
+ private static EndpointState EndpointSt = new EndpointState(HeartbeatSt);
+ private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
+ private static VersionedValue vv0 = vvFact.load(23d);
+ private static VersionedValue vv1 = vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken());
+ private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
+
+ {
+ HeartbeatSt.updateHeartBeat();
+ EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
+ EndpointSt.addApplicationState(ApplicationState.STATUS, vv1);
+ for (int i = 0; i < 100; i++)
+ Digests.add(new GossipDigest(FBUtilities.getLocalAddress(), 100 + i, 1000 + 2 * i));
+ }
+ }
+}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,80 @@
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+ private void testTreeRequestWrite() throws IOException
+ {
+ DataOutputStream out = getOutput("service.TreeRequest.bin");
+ AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out);
+ Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req), out);
+ out.close();
+ }
+
+ @Test
+ public void testTreeRequestRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testTreeRequestWrite();
+
+ DataInputStream in = getInput("service.TreeRequest.bin");
+ assert AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testTreeResponseWrite() throws IOException
+ {
+ AntiEntropyService.Validator v0 = new AntiEntropyService.Validator(Statics.req);
+ IPartitioner part = new RandomPartitioner();
+ MerkleTree mt = new MerkleTree(part, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
+ List<Token> tokens = new ArrayList<Token>();
+ for (int i = 0; i < 10; i++)
+ {
+ Token t = part.getRandomToken();
+ tokens.add(t);
+ mt.split(t);
+ }
+ AntiEntropyService.Validator v1 = new AntiEntropyService.Validator(Statics.req, mt);
+ DataOutputStream out = getOutput("service.TreeResponse.bin");
+ AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v0, out);
+ AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v1, out);
+ Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getLocalAddress(), v0), out);
+ Message.serializer().serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getLocalAddress(), v1), out);
+ out.close();
+ }
+
+ @Test
+ public void testTreeResponseRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testTreeResponseWrite();
+
+ DataInputStream in = getInput("service.TreeResponse.bin");
+ assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in) != null;
+ assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private static class Statics
+ {
+ private static final AntiEntropyService.CFPair pair = new AntiEntropyService.CFPair("Keyspace1", "Standard1");
+ private static final AntiEntropyService.TreeRequest req = new AntiEntropyService.TreeRequest("sessionId", FBUtilities.getLocalAddress(), pair);
+ }
+}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,183 @@
+package org.apache.cassandra.streaming;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+ private void testPendingFileWrite() throws IOException
+ {
+ // make sure to test serializing null and a pf with no sstable.
+ PendingFile normal = makePendingFile(true, "fake_component", 100);
+ PendingFile noSections = makePendingFile(true, "not_real", 0);
+ PendingFile noSST = makePendingFile(false, "also_fake", 100);
+
+ DataOutputStream out = getOutput("streaming.PendingFile.bin");
+ PendingFile.serializer().serialize(normal, out);
+ PendingFile.serializer().serialize(noSections, out);
+ PendingFile.serializer().serialize(noSST, out);
+ PendingFile.serializer().serialize(null, out);
+ out.close();
+ }
+
+ @Test
+ public void testPendingFileRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testPendingFileWrite();
+
+ DataInputStream in = getInput("streaming.PendingFile.bin");
+ assert PendingFile.serializer().deserialize(in) != null;
+ assert PendingFile.serializer().deserialize(in) != null;
+ assert PendingFile.serializer().deserialize(in) != null;
+ assert PendingFile.serializer().deserialize(in) == null;
+ in.close();
+ }
+
+ private void testStreamHeaderWrite() throws IOException
+ {
+ StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, "zz", 100));
+ StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, "zz", 100));
+ Collection<PendingFile> files = new ArrayList<PendingFile>();
+ for (int i = 0; i < 50; i++)
+ files.add(makePendingFile(i % 2 == 0, "aa", 100));
+ StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100), files);
+ StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
+ StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100), new ArrayList<PendingFile>());
+
+ DataOutputStream out = getOutput("streaming.StreamHeader.bin");
+ StreamHeader.serializer().serialize(sh0, out);
+ StreamHeader.serializer().serialize(sh1, out);
+ StreamHeader.serializer().serialize(sh2, out);
+ StreamHeader.serializer().serialize(sh3, out);
+ StreamHeader.serializer().serialize(sh4, out);
+ out.close();
+ }
+
+ @Test
+ public void testStreamHeaderRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testStreamHeaderWrite();
+
+ DataInputStream in = getInput("streaming.StreamHeader.bin");
+ assert StreamHeader.serializer().deserialize(in) != null;
+ assert StreamHeader.serializer().deserialize(in) != null;
+ assert StreamHeader.serializer().deserialize(in) != null;
+ assert StreamHeader.serializer().deserialize(in) != null;
+ assert StreamHeader.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testStreamReplyWrite() throws IOException
+ {
+ StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
+ DataOutputStream out = getOutput("streaming.StreamReply.bin");
+ StreamReply.serializer.serialize(rep, out);
+ Message.serializer().serialize(rep.createMessage(), out);
+ out.close();
+ }
+
+ @Test
+ public void testStreamReplyRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testStreamReplyWrite();
+
+ DataInputStream in = getInput("streaming.StreamReply.bin");
+ assert StreamReply.serializer.deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private static PendingFile makePendingFile(boolean sst, String comp, int numSecs)
+ {
+ Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), "Keyspace1", "Standard1", 23, false);
+ List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ for (int i = 0; i < numSecs; i++)
+ sections.add(new Pair<Long, Long>(new Long(i), new Long(i * i)));
+ return new PendingFile(sst ? makeSSTable() : null, desc, comp, sections);
+ }
+
+ private void testStreamRequestMessageWrite() throws IOException
+ {
+ Collection<Range> ranges = new ArrayList<Range>();
+ for (int i = 0; i < 5; i++)
+ ranges.add(new Range(new BytesToken(ByteBuffer.wrap(Integer.toString(10*i).getBytes())), new BytesToken(ByteBuffer.wrap(Integer.toString(10*i+5).getBytes()))));
+ StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L);
+ StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, "aa", 100), 124L);
+ StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, "aa", 100), 124L);
+
+ DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
+ StreamRequestMessage.serializer().serialize(msg0, out);
+ StreamRequestMessage.serializer().serialize(msg1, out);
+ StreamRequestMessage.serializer().serialize(msg2, out);
+ Message.serializer().serialize(msg0.makeMessage(), out);
+ Message.serializer().serialize(msg1.makeMessage(), out);
+ Message.serializer().serialize(msg2.makeMessage(), out);
+ out.close();
+ }
+
+ @Test
+ public void testStreamRequestMessageRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testStreamRequestMessageWrite();
+
+ DataInputStream in = getInput("streaming.StreamRequestMessage.bin");
+ assert StreamRequestMessage.serializer().deserialize(in) != null;
+ assert StreamRequestMessage.serializer().deserialize(in) != null;
+ assert StreamRequestMessage.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ assert Message.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private static SSTable makeSSTable()
+ {
+ Table t = Table.open("Keyspace1");
+ for (int i = 0; i < 100; i++)
+ {
+ RowMutation rm = new RowMutation(t.name, ByteBuffer.wrap(Long.toString(System.nanoTime()).getBytes()));
+ rm.add(new QueryPath("Standard1", null, ByteBuffer.wrap("cola".getBytes())), ByteBuffer.wrap("value".getBytes()), 0);
+ try
+ {
+ rm.apply();
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ try
+ {
+ t.getColumnFamilyStore("Standard1").forceBlockingFlush();
+ return t.getColumnFamilyStore("Standard1").getSSTables().iterator().next();
+ }
+ catch (Exception any)
+ {
+ throw new RuntimeException(any);
+ }
+ }
+}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java?rev=1062953&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/SerializationsTest.java Mon Jan 24 20:05:25 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.service.StorageService;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SerializationsTest extends AbstractSerializationsTester
+{
+
+ private void testBloomFilterWrite() throws IOException
+ {
+ BloomFilter bf = BloomFilter.getFilter(1000000, 0.0001);
+ for (int i = 0; i < 100; i++)
+ bf.add(StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken()));
+ DataOutputStream out = getOutput("utils.BloomFilter.bin");
+ BloomFilter.serializer().serialize(bf, out);
+ out.close();
+ }
+
+ @Test
+ public void testBloomFilterRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testBloomFilterWrite();
+
+ DataInputStream in = getInput("utils.BloomFilter.bin");
+ assert BloomFilter.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testLegacyBloomFilterWrite() throws IOException
+ {
+ LegacyBloomFilter a = LegacyBloomFilter.getFilter(1000000, 1000);
+ LegacyBloomFilter b = LegacyBloomFilter.getFilter(1000000, 0.0001);
+ for (int i = 0; i < 100; i++)
+ {
+ ByteBuffer key = StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken());
+ a.add(key);
+ b.add(key);
+ }
+ DataOutputStream out = getOutput("utils.LegacyBloomFilter.bin");
+ LegacyBloomFilter.serializer().serialize(a, out);
+ LegacyBloomFilter.serializer().serialize(b, out);
+ out.close();
+ }
+
+ @Test
+ public void testLegacyBloomFilterRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testLegacyBloomFilterWrite();
+
+ DataInputStream in = getInput("utils.LegacyBloomFilter.bin");
+ assert LegacyBloomFilter.serializer().deserialize(in) != null;
+ in.close();
+ }
+
+ private void testEstimatedHistogramWrite() throws IOException
+ {
+ EstimatedHistogram hist0 = new EstimatedHistogram();
+ EstimatedHistogram hist1 = new EstimatedHistogram(5000);
+ long[] offsets = new long[1000];
+ long[] data = new long[offsets.length + 1];
+ for (int i = 0; i < offsets.length; i++)
+ {
+ offsets[i] = i;
+ data[i] = 10 * i;
+ }
+ data[offsets.length] = 100000;
+ EstimatedHistogram hist2 = new EstimatedHistogram(offsets, data);
+
+ DataOutputStream out = getOutput("utils.EstimatedHistogram.bin");
+ EstimatedHistogram.serializer.serialize(hist0, out);
+ EstimatedHistogram.serializer.serialize(hist1, out);
+ EstimatedHistogram.serializer.serialize(hist2, out);
+ out.close();
+ }
+
+ @Test
+ public void testEstimatedHistogramRead() throws IOException
+ {
+ if (EXECUTE_WRITES)
+ testEstimatedHistogramWrite();
+
+ DataInputStream in = getInput("utils.EstimatedHistogram.bin");
+ assert EstimatedHistogram.serializer.deserialize(in) != null;
+ assert EstimatedHistogram.serializer.deserialize(in) != null;
+ assert EstimatedHistogram.serializer.deserialize(in) != null;
+ in.close();
+ }
+}