You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 07:45:23 UTC
svn commit: r759033 [2/2] - in
/incubator/cassandra/trunk/src/org/apache/cassandra: concurrent/ config/
cql/common/ db/ dht/ gms/ io/ locator/ net/ test/ tools/ utils/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 06:45:19 2009
@@ -212,11 +212,13 @@
}
public static class BufferWriter extends Writer
- {
+ {
+ private int size_;
BufferWriter(String filename, int size) throws IOException
{
super(filename, size);
+ size_ = size;
}
@Override
@@ -268,6 +270,145 @@
}
}
+ public static class ConcurrentWriter extends AbstractWriter
+ {
+ private FileChannel fc_;
+
+ public ConcurrentWriter(String filename) throws IOException
+ {
+ super(filename);
+ RandomAccessFile raf = new RandomAccessFile(filename, "rw");
+ fc_ = raf.getChannel();
+ }
+
+ public long getCurrentPosition() throws IOException
+ {
+ return fc_.position();
+ }
+
+ public void seek(long position) throws IOException
+ {
+ fc_.position(position);
+ }
+
+ public void append(DataOutputBuffer buffer) throws IOException
+ {
+ int length = buffer.getLength();
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(length);
+ byteBuffer.put(buffer.getData(), 0, length);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
+ {
+ int keyBufLength = keyBuffer.getLength();
+ if ( keyBuffer == null || keyBufLength == 0 )
+ throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
+
+ /* Size allocated "int" for key length + key + "int" for data length + data */
+ int length = buffer.getLength();
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect( 4 + keyBufLength + 4 + length );
+ byteBuffer.putInt(keyBufLength);
+ byteBuffer.put(keyBuffer.getData(), 0, keyBufLength);
+ byteBuffer.putInt(length);
+ byteBuffer.put(buffer.getData(), 0, length);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ public void append(String key, DataOutputBuffer buffer) throws IOException
+ {
+ if ( key == null )
+ throw new IllegalArgumentException("Key cannot be NULL.");
+
+ int length = buffer.getLength();
+ /* Size allocated : utfPrefix_ + key length + "int" for data size + data */
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect( SequenceFile.utfPrefix_ + key.length() + 4 + length);
+ SequenceFile.writeUTF(byteBuffer, key);
+ byteBuffer.putInt(length);
+ byteBuffer.put(buffer.getData(), 0, length);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ public void append(String key, byte[] value) throws IOException
+ {
+ if ( key == null )
+ throw new IllegalArgumentException("Key cannot be NULL.");
+
+ /* Size allocated key length + "int" for data size + data */
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(utfPrefix_ + key.length() + 4 + value.length);
+ SequenceFile.writeUTF(byteBuffer, key);
+ byteBuffer.putInt(value.length);
+ byteBuffer.put(value);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ public void append(String key, long value) throws IOException
+ {
+ if ( key == null )
+ throw new IllegalArgumentException("Key cannot be NULL.");
+
+ /* Size allocated key length + a long */
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SequenceFile.utfPrefix_ + key.length() + 8);
+ SequenceFile.writeUTF(byteBuffer, key);
+ byteBuffer.putLong(value);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ /*
+ * Be extremely careful while using this API. This currently
+ * used to write the commit log header in the commit logs.
+ * If not used carefully it could completely screw up reads
+ * of other key/value pairs that are written.
+ */
+ public long writeDirect(byte[] bytes) throws IOException
+ {
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length);
+ byteBuffer.put(bytes);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ return fc_.position();
+ }
+
+ public void writeLong(long value) throws IOException
+ {
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8);
+ byteBuffer.putLong(value);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ public void close() throws IOException
+ {
+ fc_.close();
+ }
+
+ public void close(byte[] footer, int size) throws IOException
+ {
+ /* Size is marker length + "int" for size + footer data */
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect( utfPrefix_ + SequenceFile.marker_.length() + 4 + footer.length);
+ SequenceFile.writeUTF(byteBuffer, SequenceFile.marker_);
+ byteBuffer.putInt(size);
+ byteBuffer.put(footer);
+ byteBuffer.flip();
+ fc_.write(byteBuffer);
+ }
+
+ public String getFileName()
+ {
+ return filename_;
+ }
+
+ public long getFileSize() throws IOException
+ {
+ return fc_.size();
+ }
+ }
+
public static class FastConcurrentWriter extends AbstractWriter
{
private FileChannel fc_;
@@ -420,8 +561,36 @@
public String getFileName()
{
return filename_;
+ }
+
+ /**
+ * Given the application key this method basically figures if
+ * the key is in the block. Key comparisons differ based on the
+ * partition function. In OPHF key is stored as is but in the
+ * case of a Random hash key used internally is hash(key):key.
+ * @param key which we are looking for
+ * @param in DataInput stream into which we are looking for the key.
+ * @return true if key is found and false otherwise.
+ * @throws IOException
+ */
+ protected boolean isKeyInBlock(String key, DataInput in) throws IOException
+ {
+ boolean bVal = false;
+ String keyInBlock = in.readUTF();
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ bVal = keyInBlock.equals(key);
+ break;
+
+ default:
+ bVal = keyInBlock.split(":")[0].equals(key);
+ break;
+ }
+ return bVal;
}
-
+
/**
* Return the position of the given key from the block index.
* @param key the key whose offset is to be extracted from the current block index
@@ -449,7 +618,7 @@
for ( int i = 0; i < keys; ++i )
{
String keyInBlock = bufIn.readUTF();
- if ( keyInBlock.equals(key) )
+ if ( keyInBlock.equals(key) )
{
position = bufIn.readLong();
break;
@@ -497,9 +666,8 @@
/* Number of keys in the block. */
int keys = bufIn.readInt();
for ( int i = 0; i < keys; ++i )
- {
- String keyInBlock = bufIn.readUTF();
- if (keyInBlock.equals(key))
+ {
+ if ( isKeyInBlock(key, bufIn) )
{
long position = bufIn.readLong();
long dataSize = bufIn.readLong();
@@ -583,7 +751,7 @@
{
/* Goto the Block Index */
seek(section.end_);
- long position = getPositionFromBlockIndex(key);
+ long position = getPositionFromBlockIndex(key);
seek(position);
}
@@ -629,9 +797,9 @@
}
return totalBytesRead;
}
-
+
/**
- * Reads the column name indexes if present. If the
+ * Reads the column name indexes if present. If the
* indexes are based on time then skip over them.
* @param cfName
* @return
@@ -640,13 +808,13 @@
{
/* check if we have an index */
boolean hasColumnIndexes = file_.readBoolean();
- int totalBytesRead = 1;
+ int totalBytesRead = 1;
/* if we do then deserialize the index */
if(hasColumnIndexes)
- {
+ {
if ( DatabaseDescriptor.isTimeSortingEnabled(cfName) )
{
- /* read the index */
+ /* read the index */
totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
}
else
@@ -658,6 +826,272 @@
}
/**
+ * This is useful in figuring out the key in system. If an OPHF
+ * is used then the "key" is the application supplied key. If a random
+ * partitioning mechanism is used then the key is of the form
+ * hash:key where hash is used internally as the key.
+ *
+ * @param in the DataInput stream from which the key needs to be read
+ * @return the appropriate key based on partitioning type
+ * @throws IOException
+ */
+ protected String readKeyFromDisk(DataInput in) throws IOException
+ {
+ String keyInDisk = null;
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch( pType )
+ {
+ case OPHF:
+ keyInDisk = in.readUTF();
+ break;
+
+ default:
+ keyInDisk = in.readUTF().split(":")[0];
+ break;
+ }
+ return keyInDisk;
+ }
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in. Always use this method to query for application
+ * specific data as it will have indexes.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param cf the IColumn we want to read
+ * @param section region of the file that needs to be read
+ * @return total number of bytes read/considered
+ */
+ public long next(String key, DataOutputBuffer bufOut, String cf, Coordinate section) throws IOException
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String columnFamilyName = values[0];
+ String columnName = (values.length == 1) ? null : values[1];
+
+ long bytesRead = -1L;
+ if ( isEOF() )
+ return bytesRead;
+ seekTo(key, section);
+ /* note the position where the key starts */
+ long startPosition = file_.getFilePointer();
+ String keyInDisk = readKeyFromDisk(file_);
+ if ( keyInDisk != null )
+ {
+ /*
+ * If key on disk is greater than requested key
+ * we can bail out since we exploit the property
+ * of the SSTable format.
+ */
+ if ( keyInDisk.compareTo(key) > 0 )
+ return bytesRead;
+
+ /*
+ * If we found the key then we populate the buffer that
+ * is passed in. If not then we skip over this key and
+ * position ourselves to read the next one.
+ */
+ int dataSize = file_.readInt();
+ if ( keyInDisk.equals(key) )
+ {
+ /* write the key into buffer */
+ bufOut.writeUTF( keyInDisk );
+
+ if(columnName == null)
+ {
+ int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
+ /*
+ * read the correct number of bytes for the column family and
+ * write data into buffer. Substract from dataSize the bloom
+ * filter size.
+ */
+ dataSize -= bytesSkipped;
+ /* write the data size */
+ bufOut.writeInt(dataSize);
+ /* write the data into buffer, except the boolean we have read */
+ bufOut.write(file_, dataSize);
+ }
+ else
+ {
+ /* Read the bloom filter for the column summarization */
+ long preBfPos = file_.getFilePointer();
+ BloomFilter bf = defreezeBloomFilter();
+ /* column does not exist in this file */
+ if ( !bf.isPresent(columnName) )
+ return bytesRead;
+ long postBfPos = file_.getFilePointer();
+ dataSize -= (postBfPos - preBfPos);
+
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ /* Read the name indexes if present */
+ int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
+ dataSize -= totalBytesRead;
+
+ /* read the column family name */
+ String cfName = file_.readUTF();
+ dataSize -= (utfPrefix_ + cfName.length());
+
+ /* read if this cf is marked for delete */
+ boolean markedForDelete = file_.readBoolean();
+ dataSize -= 1;
+
+ /* read the total number of columns */
+ int totalNumCols = file_.readInt();
+ dataSize -= 4;
+
+ /* get the column range we have to read */
+ IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnNameIndexInfo(columnName);
+ IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
+
+ Coordinate coordinate = columnRange.coordinate();
+ /* seek to the correct offset to the data, and calculate the data size */
+ file_.skipBytes((int)coordinate.start_);
+ dataSize = (int)(coordinate.end_ - coordinate.start_);
+
+ /*
+ * write the number of columns in the column family we are returning:
+ * dataSize that we are reading +
+ * length of column family name +
+ * one booleanfor deleted or not +
+ * one int for number of columns
+ */
+ bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
+ /* write the column family name */
+ bufOut.writeUTF(cfName);
+ /* write if this cf is marked for delete */
+ bufOut.writeBoolean(markedForDelete);
+ /* write number of columns */
+ bufOut.writeInt(columnRange.count());
+ /* now write the columns */
+ bufOut.write(file_, dataSize);
+ }
+ }
+ else
+ {
+ /* skip over data portion */
+ file_.seek(dataSize + file_.getFilePointer());
+ }
+
+ long endPosition = file_.getFilePointer();
+ bytesRead = endPosition - startPosition;
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in. Always use this method to query for application
+ * specific data as it will have indexes.
+
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param timeRange time range we are interested in.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String cf, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String columnFamilyName = values[0];
+ String columnName = (values.length == 1) ? null : values[1];
+
+ long bytesRead = -1L;
+ if ( isEOF() )
+ return bytesRead;
+ seekTo(key, section);
+ /* note the position where the key starts */
+ long startPosition = file_.getFilePointer();
+ String keyInDisk = readKeyFromDisk(file_);
+ if ( keyInDisk != null )
+ {
+ /*
+ * If key on disk is greater than requested key
+ * we can bail out since we exploit the property
+ * of the SSTable format.
+ */
+ if ( keyInDisk.compareTo(key) > 0 )
+ return bytesRead;
+
+ /*
+ * If we found the key then we populate the buffer that
+ * is passed in. If not then we skip over this key and
+ * position ourselves to read the next one.
+ */
+ int dataSize = file_.readInt();
+ if ( keyInDisk.equals(key) )
+ {
+ /* write the key into buffer */
+ bufOut.writeUTF( keyInDisk );
+
+ if(columnName == null)
+ {
+ int bytesSkipped = IndexHelper.skipBloomFilter(file_);
+ /*
+ * read the correct number of bytes for the column family and
+ * write data into buffer. Substract from dataSize the bloom
+ * filter size.
+ */
+ dataSize -= bytesSkipped;
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+ /* Read the times indexes if present */
+ int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);
+ dataSize -= totalBytesRead;
+
+ /* read the column family name */
+ String cfName = file_.readUTF();
+ dataSize -= (utfPrefix_ + cfName.length());
+
+ /* read if this cf is marked for delete */
+ boolean markedForDelete = file_.readBoolean();
+ dataSize -= 1;
+
+ /* read the total number of columns */
+ int totalNumCols = file_.readInt();
+ dataSize -= 4;
+
+ /* get the column range we have to read */
+ IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols);
+
+ Coordinate coordinate = columnRange.coordinate();
+ /* seek to the correct offset to the data, and calculate the data size */
+ file_.skipBytes((int)coordinate.start_);
+ dataSize = (int)(coordinate.end_ - coordinate.start_);
+
+ /*
+ * write the number of columns in the column family we are returning:
+ * dataSize that we are reading +
+ * length of column family name +
+ * one booleanfor deleted or not +
+ * one int for number of columns
+ */
+ bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
+ /* write the column family name */
+ bufOut.writeUTF(cfName);
+ /* write if this cf is marked for delete */
+ bufOut.writeBoolean(markedForDelete);
+ /* write number of columns */
+ bufOut.writeInt(columnRange.count());
+ /* now write the columns */
+ bufOut.write(file_, dataSize);
+ }
+ }
+ else
+ {
+ /* skip over data portion */
+ file_.seek(dataSize + file_.getFilePointer());
+ }
+
+ long endPosition = file_.getFilePointer();
+ bytesRead = endPosition - startPosition;
+ }
+
+ return bytesRead;
+ }
+
+ /**
* This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
* specific data as it will have indexes.
@@ -670,11 +1104,11 @@
* @return total number of bytes read/considered
*
*/
- public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+ public long next(String key, DataOutputBuffer bufOut, String cf, List<String> columnNames, Coordinate section) throws IOException
{
- assert timeRange == null || columnNames == null; // at most one may be non-null
-
- List<String> cNames = columnNames == null ? null : new ArrayList<String>(columnNames);
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String columnFamilyName = values[0];
+ List<String> cNames = new ArrayList<String>(columnNames);
long bytesRead = -1L;
if ( isEOF() )
@@ -682,8 +1116,8 @@
seekTo(key, section);
/* note the position where the key starts */
- long startPosition = file_.getFilePointer();
- String keyInDisk = file_.readUTF();
+ long startPosition = file_.getFilePointer();
+ String keyInDisk = readKeyFromDisk(file_);
if ( keyInDisk != null )
{
/*
@@ -737,9 +1171,7 @@
List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
/* read the column name indexes if present */
- int totalBytesRead = (timeRange == null)
- ? handleColumnNameIndexes(columnFamilyName, columnIndexList)
- : handleColumnTimeIndexes(columnFamilyName, columnIndexList);
+ int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
dataSize -= totalBytesRead;
/* read the column family name */
@@ -747,8 +1179,8 @@
dataSize -= (utfPrefix_ + cfName.length());
/* read if this cf is marked for delete */
- long markedForDeleteAt = file_.readLong();
- dataSize -= 8;
+ boolean markedForDelete = file_.readBoolean();
+ dataSize -= 1;
/* read the total number of columns */
int totalNumCols = file_.readInt();
@@ -758,15 +1190,7 @@
/* sort the required list of columns */
Collections.sort(cNames);
/* get the various column ranges we have to read */
- List<IndexHelper.ColumnRange> columnRanges;
- if (timeRange == null)
- {
- columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
- }
- else
- {
- columnRanges = Arrays.asList(IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols));
- }
+ List<IndexHelper.ColumnRange> columnRanges = IndexHelper.getMultiColumnRangesFromNameIndex(cNames, columnIndexList, dataSize, totalNumCols);
/* calculate the data size */
int numColsReturned = 0;
@@ -789,7 +1213,7 @@
/* write the column family name */
bufOut.writeUTF(cfName);
/* write if this cf is marked for delete */
- bufOut.writeLong(markedForDeleteAt);
+ bufOut.writeBoolean(markedForDelete);
/* write number of columns */
bufOut.writeInt(numColsReturned);
int prevPosition = 0;
@@ -856,8 +1280,65 @@
bytesRead = -1L;
return bytesRead;
}
- }
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key - key we are interested in.
+ * @param dos - DataOutputStream that needs to be filled.
+ * @param section region of the file that needs to be read
+ * @return total number of bytes read/considered
+ */
+ public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException
+ {
+ long bytesRead = -1L;
+ if ( isEOF() )
+ return bytesRead;
+
+ seekTo(key, section);
+ /* note the position where the key starts */
+ long startPosition = file_.getFilePointer();
+ String keyInDisk = readKeyFromDisk(file_);
+ if ( keyInDisk != null )
+ {
+ /*
+ * If key on disk is greater than requested key
+ * we can bail out since we exploit the property
+ * of the SSTable format.
+ */
+ if ( keyInDisk.compareTo(key) > 0 )
+ return bytesRead;
+
+ /*
+ * If we found the key then we populate the buffer that
+ * is passed in. If not then we skip over this key and
+ * position ourselves to read the next one.
+ */
+ int dataSize = file_.readInt();
+ if ( keyInDisk.equals(key) )
+ {
+ /* write the key into buffer */
+ bufOut.writeUTF( keyInDisk );
+ /* write data size into buffer */
+ bufOut.writeInt(dataSize);
+ /* write the data into buffer */
+ bufOut.write(file_, dataSize);
+ }
+ else
+ {
+ /* skip over data portion */
+ file_.seek(dataSize + file_.getFilePointer());
+ }
+
+ long endPosition = file_.getFilePointer();
+ bytesRead = endPosition - startPosition;
+ }
+
+ return bytesRead;
+ }
+ }
+
public static class Reader extends AbstractReader
{
Reader(String filename) throws IOException
@@ -951,6 +1432,7 @@
}
private static Logger logger_ = Logger.getLogger( SequenceFile.class ) ;
+ public static final short utfPrefix_ = 2;
public static final String marker_ = "Bloom-Filter";
public static IFileWriter writer(String filename) throws IOException
@@ -968,6 +1450,11 @@
return new ChecksumWriter(filename, size);
}
+ public static IFileWriter concurrentWriter(String filename) throws IOException
+ {
+ return new ConcurrentWriter(filename);
+ }
+
public static IFileWriter fastWriter(String filename, int size) throws IOException
{
return new FastConcurrentWriter(filename, size);
@@ -988,6 +1475,11 @@
return new ChecksumReader(filename, size);
}
+ public static boolean readBoolean(ByteBuffer buffer)
+ {
+ return ( buffer.get() == 1 ? true : false );
+ }
+
/**
* Efficiently writes a UTF8 string to the buffer.
* Assuming all Strings that are passed in have length
@@ -1056,4 +1548,83 @@
buffer.put(bytearr, 0, utflen + 2);
}
+ /**
+ * Read a UTF8 string from a serialized buffer.
+ * @param buffer buffer from which a UTF8 string is read
+ * @return a Java String
+ */
+ protected static String readUTF(ByteBuffer in) throws IOException
+ {
+ int utflen = in.getShort();
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c, char2, char3;
+ int count = 0;
+ int chararr_count = 0;
+
+ in.get(bytearr, 0, utflen);
+
+ while (count < utflen)
+ {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127)
+ break;
+ count++;
+ chararr[chararr_count++] = (char) c;
+ }
+
+ while (count < utflen)
+ {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4)
+ {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararr_count++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen)
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80)
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen)
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+ throw new UTFDataFormatException(
+ "malformed input around byte " + (count - 1));
+ chararr[chararr_count++] = (char) (((c & 0x0F) << 12)
+ | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException("malformed input around byte "
+ + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java Fri Mar 27 06:45:19 2009
@@ -1,5 +1,6 @@
package org.apache.cassandra.locator;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -7,12 +8,11 @@
import java.util.List;
import java.util.Map;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
/**
* This class contains a helper method that will be used by
@@ -45,10 +45,10 @@
protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
{
EndPoint endPoint = null;
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List tokens = new ArrayList(tokenToEndPointMap.keySet());
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
- Token token = tokenMetadata_.getToken(startPoint);
+ BigInteger token = tokenMetadata_.getToken(startPoint);
int index = Collections.binarySearch(tokens, token);
if(index < 0)
{
@@ -76,7 +76,7 @@
* endpoint which is in the top N.
* Get the map of top N to the live nodes currently.
*/
- public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
{
List<EndPoint> liveList = new ArrayList<EndPoint>();
Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
@@ -107,6 +107,6 @@
return map;
}
- public abstract EndPoint[] getStorageEndPoints(Token token);
+ public abstract EndPoint[] getStorageEndPoints(BigInteger token);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Fri Mar 27 06:45:19 2009
@@ -18,9 +18,9 @@
package org.apache.cassandra.locator;
+import java.math.BigInteger;
import java.util.Map;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
@@ -32,7 +32,8 @@
*/
public interface IReplicaPlacementStrategy
{
- public EndPoint[] getStorageEndPoints(Token token);
- public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
- public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token);
+ public EndPoint[] getStorageEndPoints(BigInteger token);
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Fri Mar 27 06:45:19 2009
@@ -1,13 +1,14 @@
package org.apache.cassandra.locator;
+import java.math.BigInteger;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
@@ -27,7 +28,7 @@
super(tokenMetadata);
}
- public EndPoint[] getStorageEndPoints(Token token)
+ public EndPoint[] getStorageEndPoints(BigInteger token)
{
int startIndex = 0 ;
List<EndPoint> list = new ArrayList<EndPoint>();
@@ -35,8 +36,8 @@
boolean bOtherRack = false;
int foundCount = 0;
int N = DatabaseDescriptor.getReplicationFactor();
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List tokens = new ArrayList(tokenToEndPointMap.keySet());
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
if(index < 0)
@@ -106,8 +107,98 @@
retrofitPorts(list);
return list.toArray(new EndPoint[0]);
}
+
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+ {
+ Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int startIndex = 0 ;
+ int foundCount = 0;
+ boolean bDataCenter = false;
+ boolean bOtherRack = false;
+
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+
+ for ( String key : keys )
+ {
+ BigInteger token = StorageService.hash(key);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ if( N == 1 )
+ {
+ results.put( key, list.toArray(new EndPoint[0]) );
+ return results;
+ }
+ startIndex = (index + 1)%totalNodes;
+ IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ try
+ {
+ // First try to find one in a different data center
+ if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff datacenter no need to find another
+ if( !bDataCenter )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bDataCenter = true;
+ foundCount++;
+ }
+ continue;
+ }
+ // Now try to find one on a different rack
+ if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+ endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff rack no need to find another
+ if( !bOtherRack )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bOtherRack = true;
+ foundCount++;
+ }
+ continue;
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
- public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ }
+ // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ results.put(key, list.toArray(new EndPoint[0]));
+ }
+
+ return results;
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
{
throw new UnsupportedOperationException("This operation is not currently supported");
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Fri Mar 27 06:45:19 2009
@@ -1,14 +1,18 @@
package org.apache.cassandra.locator;
+import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
/**
@@ -21,24 +25,25 @@
{
/* Use this flag to check if initialization is in order. */
private AtomicBoolean initialized_ = new AtomicBoolean(false);
-
+ private Map<Range, List<EndPoint>> rangeToEndPointMap_;
+
public RackUnawareStrategy(TokenMetadata tokenMetadata)
{
super(tokenMetadata);
}
- public EndPoint[] getStorageEndPoints(Token token)
+ public EndPoint[] getStorageEndPoints(BigInteger token)
{
return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
}
- public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
{
int startIndex = 0 ;
List<EndPoint> list = new ArrayList<EndPoint>();
int foundCount = 0;
int N = DatabaseDescriptor.getReplicationFactor();
- List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
if(index < 0)
@@ -66,5 +71,84 @@
retrofitPorts(list);
return list.toArray(new EndPoint[0]);
}
-
+
+ private void doInitialization()
+ {
+ if ( !initialized_.get() )
+ {
+ /* construct the mapping from the ranges to the replicas responsible for them */
+ rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();
+ initialized_.set(true);
+ }
+ }
+
+ /**
+ * This method determines which range in the array actually contains
+ * the hash of the key
+ * @param ranges
+ * @param key
+ * @return
+ */
+ private int findRangeIndexForKey(Range[] ranges, String key)
+ {
+ int index = 0;
+ BigInteger hash = StorageService.hash(key);
+ for ( int i = 0; i < ranges.length; ++i )
+ {
+ if ( ranges[i].contains(hash) )
+ {
+ index = i;
+ break;
+ }
+ }
+
+ return index;
+ }
+
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+ {
+ Arrays.sort(keys);
+ Range[] ranges = StorageService.instance().getAllRanges();
+
+ Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int startIndex = 0 ;
+ int foundCount = 0;
+
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ for ( String key : keys )
+ {
+ BigInteger token = StorageService.hash(key);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ startIndex = (index + 1)%totalNodes;
+ // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ results.put(key, list.toArray(new EndPoint[0]));
+ }
+
+ return results;
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Fri Mar 27 06:45:19 2009
@@ -18,13 +18,18 @@
package org.apache.cassandra.locator;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
import org.apache.cassandra.net.EndPoint;
@@ -34,19 +39,29 @@
public class TokenMetadata
{
- /* Maintains token to endpoint map of every node in the cluster. */
- private Map<Token, EndPoint> tokenToEndPointMap_ = new HashMap<Token, EndPoint>();
+ private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
+
+ public static ICompactSerializer<TokenMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ /* Maintains token to endpoint map of every node in the cluster. */
+ private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();
/* Maintains a reverse index of endpoint to token in the cluster. */
- private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint, Token>();
+ private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
/* Use this lock for manipulating the token map */
- private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
-
+ private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+
+ /*
+ * For JAXB purposes.
+ */
public TokenMetadata()
{
}
-
- private TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap)
+
+ protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
{
tokenToEndPointMap_ = tokenToEndPointMap;
endPointToTokenMap_ = endPointToTokenMap;
@@ -54,18 +69,20 @@
public TokenMetadata cloneMe()
{
- return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap());
+ Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
+ Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
+ return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
}
/**
* Update the two maps in an safe mode.
*/
- public void update(Token token, EndPoint endpoint)
+ public void update(BigInteger token, EndPoint endpoint)
{
lock_.writeLock().lock();
try
{
- Token oldToken = endPointToTokenMap_.get(endpoint);
+ BigInteger oldToken = endPointToTokenMap_.get(endpoint);
if ( oldToken != null )
tokenToEndPointMap_.remove(oldToken);
tokenToEndPointMap_.put(token, endpoint);
@@ -86,7 +103,7 @@
lock_.writeLock().lock();
try
{
- Token oldToken = endPointToTokenMap_.get(endpoint);
+ BigInteger oldToken = endPointToTokenMap_.get(endpoint);
if ( oldToken != null )
tokenToEndPointMap_.remove(oldToken);
endPointToTokenMap_.remove(endpoint);
@@ -97,7 +114,7 @@
}
}
- public Token getToken(EndPoint endpoint)
+ public BigInteger getToken(EndPoint endpoint)
{
lock_.readLock().lock();
try
@@ -126,12 +143,12 @@
/*
* Returns a safe clone of tokenToEndPointMap_.
*/
- public Map<Token, EndPoint> cloneTokenEndPointMap()
+ public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
{
lock_.readLock().lock();
try
{
- return new HashMap<Token, EndPoint>( tokenToEndPointMap_ );
+ return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
}
finally
{
@@ -142,12 +159,12 @@
/*
* Returns a safe clone of endPointTokenMap_.
*/
- public Map<EndPoint, Token> cloneEndPointTokenMap()
+ public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
{
lock_.readLock().lock();
try
{
- return new HashMap<EndPoint, Token>( endPointToTokenMap_ );
+ return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
}
finally
{
@@ -171,3 +188,51 @@
return sb.toString();
}
}
+
+class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
+{
+ public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
+ Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+ /* write the size */
+ dos.writeInt(tokens.size());
+ for ( BigInteger token : tokens )
+ {
+ byte[] bytes = token.toByteArray();
+ /* Convert the BigInteger to byte[] and persist */
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ /* Write the endpoint out */
+ CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
+ }
+ }
+
+ public TokenMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ TokenMetadata tkMetadata = null;
+ int size = dis.readInt();
+
+ if ( size > 0 )
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
+ Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ /* Read the byte[] and convert to BigInteger */
+ byte[] bytes = new byte[dis.readInt()];
+ dis.readFully(bytes);
+ BigInteger token = new BigInteger(bytes);
+ /* Read the endpoint out */
+ EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
+ tokenToEndPointMap.put(token, endpoint);
+ endPointToTokenMap.put(endpoint, token);
+ }
+
+ tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+ }
+
+ return tkMetadata;
+ }
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java Fri Mar 27 06:45:19 2009
@@ -53,20 +53,27 @@
Header header_;
private Object[] body_ = new Object[0];
- protected Message(String id, EndPoint from, String messageType, String verb, Object... body)
+ /* Ctor for JAXB. DO NOT DELETE */
+ private Message()
{
- this(new Header(id, from, messageType, verb), body);
+ }
+
+ protected Message(String id, EndPoint from, String messageType, String verb, Object[] body)
+ {
+ header_ = new Header(id, from, messageType, verb);
+ body_ = body;
}
- protected Message(Header header, Object... body)
+ protected Message(Header header, Object[] body)
{
header_ = header;
body_ = body;
}
- public Message(EndPoint from, String messageType, String verb, Object... body)
+ public Message(EndPoint from, String messageType, String verb, Object[] body)
{
- this(new Header(from, messageType, verb), body);
+ header_ = new Header(from, messageType, verb);
+ body_ = body;
}
public byte[] getHeader(Object key)
@@ -157,7 +164,7 @@
header_.setMessageId(id);
}
- public Message getReply(EndPoint from, Object... args)
+ public Message getReply(EndPoint from, Object[] args)
{
Message response = new Message(getMessageId(),
from,
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DBTest.java Fri Mar 27 06:45:19 2009
@@ -109,7 +109,7 @@
System.out.println(scanner.next().name());
}
}
-
+
public static void doTest()
{
String host = "insearch00";
@@ -182,6 +182,7 @@
//doWrites();
//doRead("543");
+ DatabaseDescriptor.init();
DBTest.doTest();
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java Fri Mar 27 06:45:19 2009
@@ -20,8 +20,12 @@
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.THttpClient;
+import com.facebook.thrift.transport.TFramedTransport;
import com.facebook.thrift.protocol.TBinaryProtocol;
-
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -29,17 +33,25 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
@@ -58,7 +70,8 @@
import org.apache.cassandra.service.batch_mutation_t;
import org.apache.cassandra.service.column_t;
import org.apache.cassandra.service.superColumn_t;
-
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Token;
@@ -894,7 +907,7 @@
Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
errorCount_++;
} else {
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0) {
logger_
.debug("ERROR ColumnFamil map is missing.....: "
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/SSTableTest.java Fri Mar 27 06:45:19 2009
@@ -20,13 +20,22 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.PrimaryKey;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
public class SSTableTest
@@ -44,14 +53,79 @@
ColumnFamily cf = new ColumnFamily("Test", "Standard");
bufOut.reset();
// random.nextBytes(bytes);
- cf.addColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
- ColumnFamily.serializerWithIndexes().serialize(cf, bufOut);
+ cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+ ColumnFamily.serializer2().serialize(cf, bufOut);
ssTable.append(key, bufOut);
bf.fill(key);
}
ssTable.close(bf);
}
+ private static void hashSSTableWrite() throws Throwable
+ {
+ Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>();
+ byte[] bytes = new byte[64*1024];
+ Random random = new Random();
+ for ( int i = 100; i < 1000; ++i )
+ {
+ String key = Integer.toString(i);
+ ColumnFamily cf = new ColumnFamily("Test", "Standard");
+ // random.nextBytes(bytes);
+ cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+ columnFamilies.put(key, cf);
+ }
+ flushForRandomPartitioner(columnFamilies);
+ }
+
+ private static void flushForRandomPartitioner(Map<String, ColumnFamily> columnFamilies) throws Throwable
+ {
+ SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1", PartitionerType.RANDOM);
+ /* List of primary keys in sorted order */
+ List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies.keySet() );
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+ for ( PrimaryKey pKey : pKeys )
+ {
+ buffer.reset();
+ ColumnFamily columnFamily = columnFamilies.get(pKey.key());
+ if ( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize( columnFamily, buffer );
+ /* Now write the key and value to disk */
+ ssTable.append(pKey.key(), pKey.hash(), buffer);
+ bf.fill(pKey.key());
+ }
+ }
+ ssTable.close(bf);
+ }
+
+ private static void readSSTable() throws Throwable
+ {
+ SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra\\Table-Test-1-Data.db");
+ for ( int i = 100; i < 1000; ++i )
+ {
+ String key = Integer.toString(i);
+ DataInputBuffer bufIn = ssTable.next(key, "Test:C");
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(bufIn);
+ if ( cf != null )
+ {
+ System.out.println("KEY:" + key);
+ System.out.println(cf.name());
+ Collection<IColumn> columns = cf.getAllColumns();
+ for ( IColumn column : columns )
+ {
+ System.out.println(column.name());
+ }
+ }
+ else
+ {
+ System.out.println("CF doesn't exist for key " + key);
+ }
+ }
+ }
+
public static void main(String[] args) throws Throwable
{
BloomFilter bf = new BloomFilter(1024*1024, 15);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleaner.java Fri Mar 27 06:45:19 2009
@@ -18,6 +18,7 @@
package org.apache.cassandra.tools;
+import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -26,6 +27,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
+import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.io.ICompactSerializer;
@@ -34,6 +36,8 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
public class MembershipCleaner
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Fri Mar 27 06:45:19 2009
@@ -18,20 +18,25 @@
package org.apache.cassandra.tools;
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.math.BigInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.config.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Fri Mar 27 06:45:19 2009
@@ -18,24 +18,24 @@
package org.apache.cassandra.tools;
+import java.util.*;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.math.BigInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.config.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -54,8 +54,9 @@
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
/* Deserialize to get the token for this endpoint. */
- Token token = Token.serializer().deserialize(bufIn);
-
+ TokenUpdater.TokenInfoMessage tiMessage = TokenUpdater.TokenInfoMessage.serializer().deserialize(bufIn);
+
+ BigInteger token = tiMessage.getToken();
logger_.info("Updating the token to [" + token + "]");
StorageService.instance().updateToken(token);
@@ -65,19 +66,19 @@
logger_.debug("Number of nodes in the header " + headers.size());
Set<String> nodes = headers.keySet();
- IPartitioner p = StorageService.getPartitioner();
for ( String node : nodes )
{
logger_.debug("Processing node " + node);
byte[] bytes = headers.remove(node);
/* Send a message to this node to update its token to the one retreived. */
EndPoint target = new EndPoint(node, DatabaseDescriptor.getStoragePort());
- token = p.getTokenFactory().fromByteArray(bytes);
+ token = new BigInteger(bytes);
- /* Reset the new Message */
+ /* Reset the new TokenInfoMessage */
+ tiMessage = new TokenUpdater.TokenInfoMessage(target, token );
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- Token.serializer().serialize(token, dos);
+ TokenInfoMessage.serializer().serialize(tiMessage, dos);
message.setMessageBody(new Object[]{bos.toByteArray()});
logger_.debug("Sending a token update message to " + target + " to update it to " + token);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java Fri Mar 27 06:45:19 2009
@@ -18,19 +18,26 @@
package org.apache.cassandra.tools;
+import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
public class TokenUpdater
{
@@ -46,17 +53,16 @@
}
String ipPort = args[0];
- IPartitioner p = StorageService.getPartitioner();
- Token token = p.getTokenFactory().fromString(args[1]);
+ String token = args[1];
String file = args[2];
String[] ipPortPair = ipPort.split(":");
EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
-
+ TokenInfoMessage tiMessage = new TokenInfoMessage( target, new BigInteger(token) );
+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- Token.serializer().serialize(token, dos);
-
+ TokenInfoMessage.serializer().serialize(tiMessage, dos);
/* Construct the token update message to be sent */
Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostName(), port_), "", StorageService.tokenVerbHandler_, new Object[]{bos.toByteArray()} );
@@ -67,8 +73,8 @@
{
String[] nodeTokenPair = line.split(" ");
/* Add the node and the token pair into the header of this message. */
- Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
- tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
+ BigInteger nodeToken = new BigInteger(nodeTokenPair[1]);
+ tokenUpdateMessage.addHeader(nodeTokenPair[0], nodeToken.toByteArray());
}
System.out.println("Sending a token update message to " + target);
@@ -76,5 +82,64 @@
Thread.sleep(TokenUpdater.waitTime_);
System.out.println("Done sending the update message");
}
+
+ public static class TokenInfoMessage implements Serializable
+ {
+ private static ICompactSerializer<TokenInfoMessage> serializer_;
+ private static AtomicInteger idGen_ = new AtomicInteger(0);
+
+ static
+ {
+ serializer_ = new TokenInfoMessageSerializer();
+ }
+
+ static ICompactSerializer<TokenInfoMessage> serializer()
+ {
+ return serializer_;
+ }
+ private EndPoint target_;
+ private BigInteger token_;
+
+ TokenInfoMessage(EndPoint target, BigInteger token)
+ {
+ target_ = target;
+ token_ = token;
+ }
+
+ EndPoint getTarget()
+ {
+ return target_;
+ }
+
+ BigInteger getToken()
+ {
+ return token_;
+ }
+ }
+
+ public static class TokenInfoMessageSerializer implements ICompactSerializer<TokenInfoMessage>
+ {
+ public void serialize(TokenInfoMessage tiMessage, DataOutputStream dos) throws IOException
+ {
+ byte[] node = EndPoint.toBytes( tiMessage.getTarget() );
+ dos.writeInt(node.length);
+ dos.write(node);
+
+ byte[] token = tiMessage.getToken().toByteArray();
+ dos.writeInt( token.length );
+ dos.write(token);
+ }
+
+ public TokenInfoMessage deserialize(DataInputStream dis) throws IOException
+ {
+ byte[] target = new byte[dis.readInt()];
+ dis.readFully(target);
+
+ byte[] token = new byte[dis.readInt()];
+ dis.readFully(token);
+
+ return new TokenInfoMessage(EndPoint.fromBytes(target), new BigInteger(token));
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/BasicUtilities.java Fri Mar 27 06:45:19 2009
@@ -18,6 +18,10 @@
package org.apache.cassandra.utils;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
/**
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/FBUtilities.java Fri Mar 27 06:45:19 2009
@@ -18,28 +18,19 @@
package org.apache.cassandra.utils;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.zip.Deflater;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
import java.security.MessageDigest;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
-import java.util.zip.Inflater;
-
+import java.io.*;
+import java.net.*;
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.math.BigInteger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java?rev=759033&r1=759032&r2=759033&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java Fri Mar 27 06:45:19 2009
@@ -187,7 +187,7 @@
}
else
{ // already FULL or REMOVED, must probe
- // compute the double token
+ // compute the double hash
final int probe = 1 + (hash % (length - 2));
// if the slot we landed on is FULL (but not removed), probe