You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/22 05:23:31 UTC
svn commit: r1026200 [6/11] - in /cassandra/trunk: ./ interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/auth/ src/java/org/apache/cassandra/avro/
src/java/org/apache/cassandra/cli/ src/java/org/apache/cassa...
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Oct 22 03:23:26 2010
@@ -18,18 +18,16 @@
package org.apache.cassandra.db;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Charsets.UTF_8;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -43,8 +41,12 @@ import org.apache.cassandra.service.IWri
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.commons.lang.ArrayUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -87,7 +89,7 @@ public class HintedHandOffManager
private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority());
- private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, byte[] key) throws IOException
+ private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
{
if (!Gossiper.instance.isKnownEndpoint(endpoint))
{
@@ -102,10 +104,10 @@ public class HintedHandOffManager
Table table = Table.open(tableName);
DecoratedKey dkey = StorageService.getPartitioner().decorateKey(key);
ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
- byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+ ByteBuffer startColumn = FBUtilities.EMPTY_BYTE_BUFFER;
while (true)
{
- QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(cfs.getColumnFamilyName()), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+ QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(cfs.getColumnFamilyName()), startColumn, FBUtilities.EMPTY_BYTE_BUFFER, false, PAGE_SIZE);
ColumnFamily cf = cfs.getColumnFamily(filter);
if (pagingFinished(cf, startColumn))
break;
@@ -133,7 +135,7 @@ public class HintedHandOffManager
return true;
}
- private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF, long timestamp) throws IOException
+ private static void deleteHintKey(ByteBuffer endpointAddress, ByteBuffer key, ByteBuffer tableCF, long timestamp) throws IOException
{
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
@@ -143,7 +145,7 @@ public class HintedHandOffManager
public static void deleteHintsForEndPoint(InetAddress endpoint)
{
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpoint.getAddress());
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(endpoint.getAddress()));
rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
try {
logger_.info("Deleting any stored hints for " + endpoint);
@@ -157,28 +159,28 @@ public class HintedHandOffManager
}
}
- private static boolean pagingFinished(ColumnFamily hintColumnFamily, byte[] startColumn)
+ private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn)
{
// done if no hints found or the start column (same as last column processed in previous iteration) is the only one
return hintColumnFamily == null
|| (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
}
- public static byte[] makeCombinedName(String tableName, String columnFamily)
+ public static ByteBuffer makeCombinedName(String tableName, String columnFamily)
{
byte[] withsep = ArrayUtils.addAll(tableName.getBytes(UTF_8), SEPARATOR.getBytes());
- return ArrayUtils.addAll(withsep, columnFamily.getBytes(UTF_8));
+ return ByteBuffer.wrap(ArrayUtils.addAll(withsep, columnFamily.getBytes(UTF_8)));
}
- private static String[] getTableAndCFNames(byte[] joined)
+ private static String[] getTableAndCFNames(ByteBuffer joined)
{
int index;
- index = ArrayUtils.lastIndexOf(joined, SEPARATOR.getBytes()[0]);
+ index = ArrayUtils.lastIndexOf(joined.array(), SEPARATOR.getBytes()[0],joined.position()+joined.arrayOffset());
if (index < 1)
throw new RuntimeException("Corrupted hint name " + joined.toString());
String[] parts = new String[2];
- parts[0] = new String(ArrayUtils.subarray(joined, 0, index));
- parts[1] = new String(ArrayUtils.subarray(joined, index+1, joined.length));
+ parts[0] = new String(ArrayUtils.subarray(joined.array(), joined.position()+joined.arrayOffset(), index));
+ parts[1] = new String(ArrayUtils.subarray(joined.array(), index+1, joined.limit()));
return parts;
}
@@ -193,14 +195,14 @@ public class HintedHandOffManager
// 3. Delete the subcolumn if the write was successful
// 4. Force a flush
// 5. Do major compaction to clean up all deletes etc.
- DecoratedKey epkey = StorageService.getPartitioner().decorateKey(endpoint.getHostAddress().getBytes(UTF_8));
+ DecoratedKey epkey = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(endpoint.getHostAddress().getBytes(UTF_8)));
int rowsReplayed = 0;
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+ ByteBuffer startColumn = FBUtilities.EMPTY_BYTE_BUFFER;
delivery:
while (true)
{
- QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+ QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, FBUtilities.EMPTY_BYTE_BUFFER, false, PAGE_SIZE);
ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
if (pagingFinished(hintColumnFamily, startColumn))
break;
@@ -214,7 +216,7 @@ public class HintedHandOffManager
String[] parts = getTableAndCFNames(tableCF.name());
if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name()))
{
- deleteHintKey(endpoint.getHostAddress().getBytes(UTF_8), keyColumn.name(), tableCF.name(), tableCF.timestamp());
+ deleteHintKey(ByteBuffer.wrap(endpoint.getHostAddress().getBytes(UTF_8)), keyColumn.name(), tableCF.name(), tableCF.timestamp());
rowsReplayed++;
}
else
@@ -248,26 +250,26 @@ public class HintedHandOffManager
/** called when a keyspace is dropped or rename. newTable==null in the case of a drop. */
public static void renameHints(String oldTable, String newTable) throws IOException
{
- DecoratedKey oldTableKey = StorageService.getPartitioner().decorateKey(oldTable.getBytes(UTF_8));
+ DecoratedKey oldTableKey = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(oldTable.getBytes(UTF_8)));
// we're basically going to fetch, drop and add the scf for the old and new table. we need to do it piecemeal
// though since there could be GB of data.
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- byte[] startCol = ArrayUtils.EMPTY_BYTE_ARRAY;
+ ByteBuffer startCol = FBUtilities.EMPTY_BYTE_BUFFER;
long now = System.currentTimeMillis();
while (true)
{
- QueryFilter filter = QueryFilter.getSliceFilter(oldTableKey, new QueryPath(HINTS_CF), startCol, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+ QueryFilter filter = QueryFilter.getSliceFilter(oldTableKey, new QueryPath(HINTS_CF), startCol, FBUtilities.EMPTY_BYTE_BUFFER, false, PAGE_SIZE);
ColumnFamily cf = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
if (pagingFinished(cf, startCol))
break;
if (newTable != null)
{
- RowMutation insert = new RowMutation(Table.SYSTEM_TABLE, newTable.getBytes(UTF_8));
+ RowMutation insert = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(newTable.getBytes(UTF_8)));
insert.add(cf);
insert.apply();
}
RowMutation drop = new RowMutation(Table.SYSTEM_TABLE, oldTableKey.key);
- for (byte[] key : cf.getColumnNames())
+ for (ByteBuffer key : cf.getColumnNames())
{
drop.delete(new QueryPath(HINTS_CF, key), now);
startCol = key;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Fri Oct 22 03:23:26 2010
@@ -18,8 +18,9 @@
package org.apache.cassandra.db;
-import java.util.Collection;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.Collection;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.FBUtilities;
@@ -31,13 +32,13 @@ public interface IColumn
public boolean isMarkedForDelete();
public long getMarkedForDeleteAt();
public long mostRecentLiveChangeAt();
- public byte[] name();
+ public ByteBuffer name();
public int size();
public int serializedSize();
public long timestamp();
- public byte[] value();
+ public ByteBuffer value();
public Collection<IColumn> getSubColumns();
- public IColumn getSubColumn(byte[] columnName);
+ public IColumn getSubColumn(ByteBuffer columnName);
public void addColumn(IColumn column);
public IColumn diff(IColumn column);
public IColumn reconcile(IColumn column);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Oct 22 03:23:26 2010
@@ -19,27 +19,31 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.AbstractColumnIterator;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
public class Memtable implements Comparable<Memtable>, IFlushable
{
@@ -211,7 +215,7 @@ public class Memtable implements Compara
Comparator<IColumn> comparator = filter.getColumnComparator(typeComparator);
final PeekingIterator<IColumn> filteredIter = Iterators.peekingIterator(filteredColumns.iterator());
- if (!filter.reversed || filter.start.length != 0)
+ if (!filter.reversed || filter.start.remaining() != 0)
{
while (filteredIter.hasNext() && comparator.compare(filteredIter.peek(), startColumn) < 0)
{
@@ -238,7 +242,7 @@ public class Memtable implements Compara
public IColumn next()
{
- return filteredIter.next();
+ return filteredIter.next();
}
};
}
@@ -250,8 +254,8 @@ public class Memtable implements Compara
return new SimpleAbstractColumnIterator()
{
- private Iterator<byte[]> iter = filter.columns.iterator();
- private byte[] current;
+ private Iterator<ByteBuffer> iter = filter.columns.iterator();
+ private ByteBuffer current;
public ColumnFamily getColumnFamily()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Fri Oct 22 03:23:26 2010
@@ -36,12 +36,16 @@
package org.apache.cassandra.db;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
@@ -51,12 +55,6 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
public class RangeSliceCommand
{
private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
@@ -64,7 +62,7 @@ public class RangeSliceCommand
public final String keyspace;
public final String column_family;
- public final byte[] super_column;
+ public final ByteBuffer super_column;
public final SlicePredicate predicate;
@@ -73,10 +71,10 @@ public class RangeSliceCommand
public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds range, int max_keys)
{
- this(keyspace, column_parent.getColumn_family(), column_parent.getSuper_column(), predicate, range, max_keys);
+ this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, max_keys);
}
- public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, AbstractBounds range, int max_keys)
+ public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds range, int max_keys)
{
this.keyspace = keyspace;
this.column_family = column_family;
@@ -122,9 +120,9 @@ class RangeSliceCommandSerializer implem
{
dos.writeUTF(sliceCommand.keyspace);
dos.writeUTF(sliceCommand.column_family);
- dos.writeInt(sliceCommand.super_column == null ? 0 : sliceCommand.super_column.length);
+ dos.writeInt(sliceCommand.super_column == null ? 0 : sliceCommand.super_column.remaining());
if (sliceCommand.super_column != null)
- dos.write(sliceCommand.super_column);
+ dos.write(sliceCommand.super_column.array(),sliceCommand.super_column.position()+sliceCommand.super_column.arrayOffset(),sliceCommand.super_column.remaining());
TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
FBUtilities.serialize(ser, sliceCommand.predicate, dos);
@@ -138,9 +136,9 @@ class RangeSliceCommandSerializer implem
String column_family = dis.readUTF();
int scLength = dis.readInt();
- byte[] super_column = null;
+ ByteBuffer super_column = null;
if (scLength > 0)
- super_column = readBuf(scLength, dis);
+ super_column = ByteBuffer.wrap(readBuf(scLength, dis));
TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
SlicePredicate pred = new SlicePredicate();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Fri Oct 22 03:23:26 2010
@@ -22,17 +22,16 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.concurrent.StageManager;
public abstract class ReadCommand
@@ -58,11 +57,11 @@ public abstract class ReadCommand
public final QueryPath queryPath;
public final String table;
- public final byte[] key;
+ public final ByteBuffer key;
private boolean isDigestQuery = false;
protected final byte commandType;
- protected ReadCommand(String table, byte[] key, QueryPath queryPath, byte cmdType)
+ protected ReadCommand(String table, ByteBuffer key, QueryPath queryPath, byte cmdType)
{
this.table = table;
this.key = key;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Fri Oct 22 03:23:26 2010
@@ -18,17 +18,13 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-
import org.apache.commons.lang.ArrayUtils;
@@ -52,10 +48,10 @@ private static ICompactSerializer<ReadRe
}
private Row row_;
- private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
+ private ByteBuffer digest_ = FBUtilities.EMPTY_BYTE_BUFFER;
private boolean isDigestQuery_ = false;
- public ReadResponse(byte[] digest )
+ public ReadResponse(ByteBuffer digest )
{
assert digest != null;
digest_= digest;
@@ -71,7 +67,7 @@ private static ICompactSerializer<ReadRe
return row_;
}
- public byte[] digest()
+ public ByteBuffer digest()
{
return digest_;
}
@@ -91,8 +87,8 @@ class ReadResponseSerializer implements
{
public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
{
- dos.writeInt(rm.digest().length);
- dos.write(rm.digest());
+ dos.writeInt(rm.digest().remaining());
+ dos.write(rm.digest().array(),rm.digest().position()+rm.digest().arrayOffset(),rm.digest().remaining());
dos.writeBoolean(rm.isDigestQuery());
if( !rm.isDigestQuery() && rm.row() != null )
@@ -114,7 +110,7 @@ class ReadResponseSerializer implements
row = Row.serializer().deserialize(dis);
}
- ReadResponse rmsg = isDigest ? new ReadResponse(digest) : new ReadResponse(row);
+ ReadResponse rmsg = isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
rmsg.setIsDigestQuery(isDigest);
return rmsg;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Fri Oct 22 03:23:26 2010
@@ -26,6 +26,8 @@ import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Oct 22 03:23:26 2010
@@ -22,23 +22,27 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
public class RowMutation
{
@@ -56,11 +60,11 @@ public class RowMutation
}
private String table_;
- private byte[] key_;
+ private ByteBuffer key_;
// map of column family id to mutations for that column family.
protected Map<Integer, ColumnFamily> modifications_ = new HashMap<Integer, ColumnFamily>();
- public RowMutation(String table, byte[] key)
+ public RowMutation(String table, ByteBuffer key)
{
table_ = table;
key_ = key;
@@ -73,7 +77,7 @@ public class RowMutation
add(row.cf);
}
- protected RowMutation(String table, byte[] key, Map<Integer, ColumnFamily> modifications)
+ protected RowMutation(String table, ByteBuffer key, Map<Integer, ColumnFamily> modifications)
{
table_ = table;
key_ = key;
@@ -85,7 +89,7 @@ public class RowMutation
return table_;
}
- public byte[] key()
+ public ByteBuffer key()
{
return key_;
}
@@ -99,9 +103,9 @@ public class RowMutation
{
for (ColumnFamily cf : rm.getColumnFamilies())
{
- byte[] combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
+ ByteBuffer combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined);
- add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis(), cf.metadata().gcGraceSeconds);
+ add(path, FBUtilities.EMPTY_BYTE_BUFFER, System.currentTimeMillis(), cf.metadata().gcGraceSeconds);
}
}
@@ -138,7 +142,7 @@ public class RowMutation
* param @ timestamp - timestamp associated with this data.
* param @ timeToLive - ttl for the column, 0 for standard (non expiring) columns
*/
- public void add(QueryPath path, byte[] value, long timestamp, int timeToLive)
+ public void add(QueryPath path, ByteBuffer value, long timestamp, int timeToLive)
{
Integer id = CFMetaData.getId(table_, path.columnFamilyName);
ColumnFamily columnFamily = modifications_.get(id);
@@ -150,7 +154,7 @@ public class RowMutation
columnFamily.addColumn(path, value, timestamp, timeToLive);
}
- public void add(QueryPath path, byte[] value, long timestamp)
+ public void add(QueryPath path, ByteBuffer value, long timestamp)
{
add(path, value, timestamp, 0);
}
@@ -215,7 +219,7 @@ public class RowMutation
return new Message(FBUtilities.getLocalAddress(), verb, bos.toByteArray());
}
- public static RowMutation getRowMutationFromMutations(String keyspace, byte[] key, Map<String, List<Mutation>> cfmap)
+ public static RowMutation getRowMutationFromMutations(String keyspace, ByteBuffer key, Map<String, List<Mutation>> cfmap)
{
RowMutation rm = new RowMutation(keyspace, key);
for (Map.Entry<String, List<Mutation>> entry : cfmap.entrySet())
@@ -236,7 +240,7 @@ public class RowMutation
return rm;
}
- public static RowMutation getRowMutation(String table, byte[] key, Map<String, List<ColumnOrSuperColumn>> cfmap)
+ public static RowMutation getRowMutation(String table, ByteBuffer key, Map<String, List<ColumnOrSuperColumn>> cfmap)
{
RowMutation rm = new RowMutation(table, key);
for (Map.Entry<String, List<ColumnOrSuperColumn>> entry : cfmap.entrySet())
@@ -314,7 +318,7 @@ public class RowMutation
{
if (del.predicate != null && del.predicate.column_names != null)
{
- for(byte[] c : del.predicate.column_names)
+ for(ByteBuffer c : del.predicate.column_names)
{
if (del.super_column == null && DatabaseDescriptor.getColumnFamilyType(rm.table_, cfName) == ColumnFamilyType.Super)
rm.delete(new QueryPath(cfName, c), del.timestamp);
@@ -370,7 +374,7 @@ class RowMutationSerializer implements I
public RowMutation deserialize(DataInputStream dis) throws IOException
{
String table = dis.readUTF();
- byte[] key = FBUtilities.readShortByteArray(dis);
+ ByteBuffer key = FBUtilities.readShortByteArray(dis);
Map<Integer, ColumnFamily> modifications = defreezeTheMaps(dis);
return new RowMutation(table, key, modifications);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Oct 22 03:23:26 2010
@@ -58,9 +58,9 @@ public class RowMutationVerbHandler impl
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(hintedBytes));
while (dis.available() > 0)
{
- byte[] addressBytes = FBUtilities.readShortByteArray(dis);
+ ByteBuffer addressBytes = FBUtilities.readShortByteArray(dis);
if (logger_.isDebugEnabled())
- logger_.debug("Adding hint for " + InetAddress.getByName(new String(addressBytes)));
+ logger_.debug("Adding hint for " + InetAddress.getByName(new String(addressBytes.array(),addressBytes.position()+addressBytes.arrayOffset(),addressBytes.remaining())));
RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes);
hintedMutation.addHints(rm);
hintedMutation.apply();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Fri Oct 22 03:23:26 2010
@@ -20,7 +20,12 @@ package org.apache.cassandra.db;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
@@ -30,17 +35,17 @@ import org.apache.cassandra.utils.FBUtil
public class SliceByNamesReadCommand extends ReadCommand
{
- public final SortedSet<byte[]> columnNames;
+ public final SortedSet<ByteBuffer> columnNames;
- public SliceByNamesReadCommand(String table, byte[] key, ColumnParent column_parent, Collection<byte[]> columnNames)
+ public SliceByNamesReadCommand(String table, ByteBuffer key, ColumnParent column_parent, Collection<ByteBuffer> columnNames)
{
this(table, key, new QueryPath(column_parent), columnNames);
}
- public SliceByNamesReadCommand(String table, byte[] key, QueryPath path, Collection<byte[]> columnNames)
+ public SliceByNamesReadCommand(String table, ByteBuffer key, QueryPath path, Collection<ByteBuffer> columnNames)
{
super(table, key, path, CMD_TYPE_GET_SLICE_BY_NAMES);
- this.columnNames = new TreeSet<byte[]>(getComparator());
+ this.columnNames = new TreeSet<ByteBuffer>(getComparator());
this.columnNames.addAll(columnNames);
}
@@ -85,7 +90,7 @@ class SliceByNamesReadCommandSerializer
dos.writeInt(realRM.columnNames.size());
if (realRM.columnNames.size() > 0)
{
- for (byte[] cName : realRM.columnNames)
+ for (ByteBuffer cName : realRM.columnNames)
{
FBUtilities.writeShortByteArray(cName, dos);
}
@@ -97,11 +102,11 @@ class SliceByNamesReadCommandSerializer
{
boolean isDigest = dis.readBoolean();
String table = dis.readUTF();
- byte[] key = FBUtilities.readShortByteArray(dis);
+ ByteBuffer key = FBUtilities.readShortByteArray(dis);
QueryPath columnParent = QueryPath.deserialize(dis);
int size = dis.readInt();
- List<byte[]> columns = new ArrayList<byte[]>();
+ List<ByteBuffer> columns = new ArrayList<ByteBuffer>();
for (int i = 0; i < size; ++i)
{
columns.add(FBUtilities.readShortByteArray(dis));
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Fri Oct 22 03:23:26 2010
@@ -20,7 +20,7 @@ package org.apache.cassandra.db;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.List;
+import java.nio.ByteBuffer;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
@@ -30,16 +30,16 @@ import org.apache.cassandra.utils.FBUtil
public class SliceFromReadCommand extends ReadCommand
{
- public final byte[] start, finish;
+ public final ByteBuffer start, finish;
public final boolean reversed;
public final int count;
- public SliceFromReadCommand(String table, byte[] key, ColumnParent column_parent, byte[] start, byte[] finish, boolean reversed, int count)
+ public SliceFromReadCommand(String table, ByteBuffer key, ColumnParent column_parent, ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
{
this(table, key, new QueryPath(column_parent), start, finish, reversed, count);
}
- public SliceFromReadCommand(String table, byte[] key, QueryPath path, byte[] start, byte[] finish, boolean reversed, int count)
+ public SliceFromReadCommand(String table, ByteBuffer key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
{
super(table, key, path, CMD_TYPE_GET_SLICE);
this.start = start;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Oct 22 03:23:26 2010
@@ -18,22 +18,22 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.security.MessageDigest;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SuperColumn implements IColumn, IColumnContainer
@@ -45,20 +45,20 @@ public class SuperColumn implements ICol
return new SuperColumnSerializer(comparator);
}
- private byte[] name_;
- private ConcurrentSkipListMap<byte[], IColumn> columns_;
+ private ByteBuffer name_;
+ private ConcurrentSkipListMap<ByteBuffer, IColumn> columns_;
private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
- public SuperColumn(byte[] name, AbstractType comparator)
+ public SuperColumn(ByteBuffer name, AbstractType comparator)
{
- this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator));
+ this(name, new ConcurrentSkipListMap<ByteBuffer, IColumn>(comparator));
}
- private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns)
+ private SuperColumn(ByteBuffer name, ConcurrentSkipListMap<ByteBuffer, IColumn> columns)
{
assert name != null;
- assert name.length <= IColumn.MAX_NAME_LENGTH;
+ assert name.remaining() <= IColumn.MAX_NAME_LENGTH;
name_ = name;
columns_ = columns;
}
@@ -77,7 +77,7 @@ public class SuperColumn implements ICol
public IColumn cloneMe()
{
- SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_));
+ SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns_));
sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get());
return sc;
}
@@ -87,7 +87,7 @@ public class SuperColumn implements ICol
return markedForDeleteAt.get() > Long.MIN_VALUE;
}
- public byte[] name()
+ public ByteBuffer name()
{
return name_;
}
@@ -97,7 +97,7 @@ public class SuperColumn implements ICol
return columns_.values();
}
- public IColumn getSubColumn(byte[] columnName)
+ public IColumn getSubColumn(ByteBuffer columnName)
{
IColumn column = columns_.get(columnName);
assert column == null || column instanceof Column;
@@ -127,10 +127,10 @@ public class SuperColumn implements ICol
* We need to keep the way we are calculating the column size in sync with the
* way we are calculating the size for the column family serializer.
*/
- return DBConstants.shortSize_ + name_.length + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size();
+ return DBConstants.shortSize_ + name_.remaining() + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size();
}
- public void remove(byte[] columnName)
+ public void remove(ByteBuffer columnName)
{
columns_.remove(columnName);
}
@@ -153,7 +153,7 @@ public class SuperColumn implements ICol
return max;
}
- public byte[] value()
+ public ByteBuffer value()
{
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}
@@ -162,7 +162,7 @@ public class SuperColumn implements ICol
{
assert column instanceof Column : "A super column can only contain simple columns";
- byte[] name = column.name();
+ ByteBuffer name = column.name();
IColumn oldColumn = columns_.putIfAbsent(name, column);
if (oldColumn != null)
{
@@ -236,7 +236,7 @@ public class SuperColumn implements ICol
public void updateDigest(MessageDigest digest)
{
assert name_ != null;
- digest.update(name_);
+ digest.update(name_.array(),name_.position()+name_.arrayOffset(),name_.remaining());
DataOutputBuffer buffer = new DataOutputBuffer();
try
{
@@ -326,7 +326,7 @@ class SuperColumnSerializer implements I
public IColumn deserialize(DataInput dis) throws IOException
{
- byte[] name = FBUtilities.readShortByteArray(dis);
+ ByteBuffer name = FBUtilities.readShortByteArray(dis);
SuperColumn superColumn = new SuperColumn(name, comparator);
int localDeleteTime = dis.readInt();
if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Fri Oct 22 03:23:26 2010
@@ -18,21 +18,20 @@
package org.apache.cassandra.db;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -42,25 +41,26 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SystemTable
{
private static Logger logger = LoggerFactory.getLogger(SystemTable.class);
public static final String STATUS_CF = "LocationInfo"; // keep the old CF string for backwards-compatibility
public static final String INDEX_CF = "IndexInfo";
- private static final byte[] LOCATION_KEY = "L".getBytes(UTF_8);
- private static final byte[] RING_KEY = "Ring".getBytes(UTF_8);
- private static final byte[] BOOTSTRAP_KEY = "Bootstrap".getBytes(UTF_8);
- private static final byte[] COOKIE_KEY = "Cookies".getBytes(UTF_8);
- private static final byte[] BOOTSTRAP = "B".getBytes(UTF_8);
- private static final byte[] TOKEN = "Token".getBytes(UTF_8);
- private static final byte[] GENERATION = "Generation".getBytes(UTF_8);
- private static final byte[] CLUSTERNAME = "ClusterName".getBytes(UTF_8);
- private static final byte[] PARTITIONER = "Partioner".getBytes(UTF_8);
+ private static final ByteBuffer LOCATION_KEY = ByteBuffer.wrap("L".getBytes(UTF_8));
+ private static final ByteBuffer RING_KEY = ByteBuffer.wrap("Ring".getBytes(UTF_8));
+ private static final ByteBuffer BOOTSTRAP_KEY = ByteBuffer.wrap("Bootstrap".getBytes(UTF_8));
+ private static final ByteBuffer COOKIE_KEY = ByteBuffer.wrap("Cookies".getBytes(UTF_8));
+ private static final ByteBuffer BOOTSTRAP = ByteBuffer.wrap("B".getBytes(UTF_8));
+ private static final ByteBuffer TOKEN = ByteBuffer.wrap("Token".getBytes(UTF_8));
+ private static final ByteBuffer GENERATION = ByteBuffer.wrap("Generation".getBytes(UTF_8));
+ private static final ByteBuffer CLUSTERNAME = ByteBuffer.wrap("ClusterName".getBytes(UTF_8));
+ private static final ByteBuffer PARTITIONER = ByteBuffer.wrap("Partioner".getBytes(UTF_8));
- private static DecoratedKey decorate(byte[] key)
+ private static DecoratedKey decorate(ByteBuffer key)
{
return StorageService.getPartitioner().decorateKey(key);
}
@@ -69,7 +69,7 @@ public class SystemTable
public static void purgeIncompatibleHints() throws IOException
{
// 0.6->0.7
- final byte[] hintsPurged6to7 = "Hints purged as part of upgrading from 0.6.x to 0.7".getBytes();
+ final ByteBuffer hintsPurged6to7 = ByteBuffer.wrap("Hints purged as part of upgrading from 0.6.x to 0.7".getBytes());
Table table = Table.open(Table.SYSTEM_TABLE);
QueryFilter dotSeven = QueryFilter.getNamesFilter(decorate(COOKIE_KEY), new QueryPath(STATUS_CF), hintsPurged6to7);
ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(dotSeven);
@@ -79,7 +79,7 @@ public class SystemTable
logger.info("Upgrading to 0.7. Purging hints if there are any. Old hints will be snapshotted.");
new Truncation(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF).apply();
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, COOKIE_KEY);
- rm.add(new QueryPath(STATUS_CF, null, hintsPurged6to7), "oh yes, it they were purged.".getBytes(), System.currentTimeMillis());
+ rm.add(new QueryPath(STATUS_CF, null, hintsPurged6to7), ByteBuffer.wrap("oh yes, it they were purged.".getBytes()), System.currentTimeMillis());
rm.apply();
}
}
@@ -91,7 +91,7 @@ public class SystemTable
{
IPartitioner p = StorageService.getPartitioner();
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
- cf.addColumn(new Column(p.getTokenFactory().toByteArray(token), ep.getAddress(), System.currentTimeMillis()));
+ cf.addColumn(new Column(p.getTokenFactory().toByteArray(token), ByteBuffer.wrap(ep.getAddress()), System.currentTimeMillis()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, RING_KEY);
rm.add(cf);
try
@@ -177,7 +177,10 @@ public class SystemTable
{
try
{
- tokenMap.put(p.getTokenFactory().fromByteArray(column.name()), InetAddress.getByAddress(column.value()));
+ byte[] addr = new byte[column.value().remaining()];
+ System.arraycopy(column.value().array(), column.value().position()+column.value().arrayOffset(), addr, 0, column.value().remaining());
+
+ tokenMap.put(p.getTokenFactory().fromByteArray(column.name()), InetAddress.getByAddress(addr));
}
catch (UnknownHostException e)
{
@@ -210,7 +213,7 @@ public class SystemTable
throw ex;
}
- SortedSet<byte[]> cols = new TreeSet<byte[]>(BytesType.instance);
+ SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
cols.add(PARTITIONER);
cols.add(CLUSTERNAME);
QueryFilter filter = QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF), cols);
@@ -235,8 +238,8 @@ public class SystemTable
// no system files. this is a new node.
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF);
- cf.addColumn(new Column(PARTITIONER, DatabaseDescriptor.getPartitioner().getClass().getName().getBytes(UTF_8), FBUtilities.timestampMicros()));
- cf.addColumn(new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes(), FBUtilities.timestampMicros()));
+ cf.addColumn(new Column(PARTITIONER, ByteBuffer.wrap(DatabaseDescriptor.getPartitioner().getClass().getName().getBytes(UTF_8)), FBUtilities.timestampMicros()));
+ cf.addColumn(new Column(CLUSTERNAME, ByteBuffer.wrap(DatabaseDescriptor.getClusterName().getBytes()), FBUtilities.timestampMicros()));
rm.add(cf);
rm.apply();
@@ -248,10 +251,13 @@ public class SystemTable
IColumn clusterCol = cf.getColumn(CLUSTERNAME);
assert partitionerCol != null;
assert clusterCol != null;
- if (!DatabaseDescriptor.getPartitioner().getClass().getName().equals(new String(partitionerCol.value(), UTF_8)))
+ if (!DatabaseDescriptor.getPartitioner().getClass().getName().equals(
+ new String(partitionerCol.value().array(),
+ partitionerCol.value().position()+partitionerCol.value().arrayOffset(),
+ partitionerCol.value().remaining(), UTF_8)))
throw new ConfigurationException("Detected partitioner mismatch! Did you change the partitioner?");
- if (!DatabaseDescriptor.getClusterName().equals(new String(clusterCol.value())))
- throw new ConfigurationException("Saved cluster name " + new String(clusterCol.value()) + " != configured name " + DatabaseDescriptor.getClusterName());
+ if (!DatabaseDescriptor.getClusterName().equals(new String(clusterCol.value().array(),clusterCol.value().position()+clusterCol.value().arrayOffset(),clusterCol.value().remaining())))
+ throw new ConfigurationException("Saved cluster name " + new String(clusterCol.value().array(),clusterCol.value().position()+clusterCol.value().arrayOffset(),clusterCol.value().remaining()) + " != configured name " + DatabaseDescriptor.getClusterName());
}
public static Token getSavedToken()
@@ -299,13 +305,16 @@ public class SystemTable
new QueryPath(STATUS_CF),
BOOTSTRAP);
ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
- return cf != null && cf.getColumn(BOOTSTRAP).value()[0] == 1;
+ IColumn c = cf.getColumn(BOOTSTRAP);
+ return cf != null && c.value().array()[c.value().position()+c.value().arrayOffset()] == 1;
}
public static void setBootstrapped(boolean isBootstrapped)
{
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
- cf.addColumn(new Column(BOOTSTRAP, new byte[] { (byte) (isBootstrapped ? 1 : 0) }, System.currentTimeMillis()));
+ cf.addColumn(new Column(BOOTSTRAP,
+ ByteBuffer.wrap(new byte[] { (byte) (isBootstrapped ? 1 : 0) }),
+ System.currentTimeMillis()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, BOOTSTRAP_KEY);
rm.add(cf);
try
@@ -321,17 +330,17 @@ public class SystemTable
public static boolean isIndexBuilt(String table, String indexName)
{
ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(INDEX_CF);
- QueryFilter filter = QueryFilter.getNamesFilter(decorate(table.getBytes(UTF_8)),
+ QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBuffer.wrap(table.getBytes(UTF_8))),
new QueryPath(INDEX_CF),
- indexName.getBytes(UTF_8));
+ ByteBuffer.wrap(indexName.getBytes(UTF_8)));
return cfs.getColumnFamily(filter) != null;
}
public static void setIndexBuilt(String table, String indexName)
{
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, INDEX_CF);
- cf.addColumn(new Column(indexName.getBytes(UTF_8), ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis()));
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, table.getBytes(UTF_8));
+ cf.addColumn(new Column(ByteBuffer.wrap(indexName.getBytes(UTF_8)), FBUtilities.EMPTY_BYTE_BUFFER, System.currentTimeMillis()));
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(table.getBytes(UTF_8)));
rm.add(cf);
try
{
@@ -347,8 +356,8 @@ public class SystemTable
public static void setIndexRemoved(String table, String indexName)
{
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, table.getBytes(UTF_8));
- rm.delete(new QueryPath(INDEX_CF, null, indexName.getBytes(UTF_8)), System.currentTimeMillis());
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(table.getBytes(UTF_8)));
+ rm.delete(new QueryPath(INDEX_CF, null, ByteBuffer.wrap(indexName.getBytes(UTF_8))), System.currentTimeMillis());
try
{
rm.apply();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Oct 22 03:23:26 2010
@@ -21,19 +21,22 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -47,7 +50,13 @@ import org.apache.cassandra.io.sstable.S
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.ArrayUtils;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
public class Table
{
@@ -361,13 +370,13 @@ public class Table
continue;
}
- SortedSet<byte[]> mutatedIndexedColumns = null;
- for (byte[] column : cfs.getIndexedColumns())
+ SortedSet<ByteBuffer> mutatedIndexedColumns = null;
+ for (ByteBuffer column : cfs.getIndexedColumns())
{
if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
{
if (mutatedIndexedColumns == null)
- mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
+ mutatedIndexedColumns = new TreeSet<ByteBuffer>();
mutatedIndexedColumns.add(column);
}
}
@@ -416,7 +425,7 @@ public class Table
return memtablesToFlush;
}
- private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
+ private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet<ByteBuffer> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
{
if (oldIndexedColumns == null)
return;
@@ -440,7 +449,7 @@ public class Table
}
}
- private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<byte[]> mutatedIndexedColumns)
+ private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet<ByteBuffer> mutatedIndexedColumns)
{
QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
return cfs.getColumnFamily(filter);
@@ -450,16 +459,16 @@ public class Table
* removes obsolete index entries and creates new ones for the given row key and mutated columns.
* @return list of full (index CF) memtables
*/
- private static List<Memtable> applyIndexUpdates(byte[] key,
+ private static List<Memtable> applyIndexUpdates(ByteBuffer key,
ColumnFamily cf,
ColumnFamilyStore cfs,
- SortedSet<byte[]> mutatedIndexedColumns,
+ SortedSet<ByteBuffer> mutatedIndexedColumns,
ColumnFamily oldIndexedColumns)
{
List<Memtable> fullMemtables = Collections.emptyList();
// add new index entries
- for (byte[] columnName : mutatedIndexedColumns)
+ for (ByteBuffer columnName : mutatedIndexedColumns)
{
IColumn column = cf.getColumn(columnName);
if (column == null || column.isMarkedForDelete())
@@ -470,11 +479,11 @@ public class Table
if (column instanceof ExpiringColumn)
{
ExpiringColumn ec = (ExpiringColumn)column;
- cfi.addColumn(new ExpiringColumn(key, ArrayUtils.EMPTY_BYTE_ARRAY, ec.timestamp, ec.getTimeToLive(), ec.getLocalDeletionTime()));
+ cfi.addColumn(new ExpiringColumn(key, FBUtilities.EMPTY_BYTE_BUFFER, ec.timestamp, ec.getTimeToLive(), ec.getLocalDeletionTime()));
}
else
{
- cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, column.timestamp()));
+ cfi.addColumn(new Column(key, FBUtilities.EMPTY_BYTE_BUFFER, column.timestamp()));
}
Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi);
if (fullMemtable != null)
@@ -485,9 +494,9 @@ public class Table
if (oldIndexedColumns != null)
{
int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
- for (Map.Entry<byte[], IColumn> entry : oldIndexedColumns.getColumnsMap().entrySet())
+ for (Map.Entry<ByteBuffer, IColumn> entry : oldIndexedColumns.getColumnsMap().entrySet())
{
- byte[] columnName = entry.getKey();
+ ByteBuffer columnName = entry.getKey();
IColumn column = entry.getValue();
if (column.isMarkedForDelete())
continue;
@@ -503,7 +512,7 @@ public class Table
return fullMemtables;
}
- public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns, ReducingKeyIterator iter)
+ public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, SortedSet<ByteBuffer> columns, ReducingKeyIterator iter)
{
return new IndexBuilder(cfs, columns, iter);
}
@@ -511,10 +520,10 @@ public class Table
public class IndexBuilder implements ICompactionInfo
{
private final ColumnFamilyStore cfs;
- private final SortedSet<byte[]> columns;
+ private final SortedSet<ByteBuffer> columns;
private final ReducingKeyIterator iter;
- public IndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns, ReducingKeyIterator iter)
+ public IndexBuilder(ColumnFamilyStore cfs, SortedSet<ByteBuffer> columns, ReducingKeyIterator iter)
{
this.cfs = cfs;
this.columns = columns;
@@ -574,9 +583,9 @@ public class Table
}
}
- private Object indexLockFor(byte[] key)
+ private Object indexLockFor(ByteBuffer key)
{
- return indexLocks[Math.abs(Arrays.hashCode(key) % indexLocks.length)];
+ return indexLocks[Math.abs(key.hashCode() % indexLocks.length)];
}
public List<Future<?>> flush() throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Fri Oct 22 03:23:26 2010
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
@@ -51,10 +52,10 @@ public class WriteResponse
}
private final String table_;
- private final byte[] key_;
+ private final ByteBuffer key_;
private final boolean status_;
- public WriteResponse(String table, byte[] key, boolean bVal) {
+ public WriteResponse(String table, ByteBuffer key, boolean bVal) {
table_ = table;
key_ = key;
status_ = bVal;
@@ -65,7 +66,7 @@ public class WriteResponse
return table_;
}
- public byte[] key()
+ public ByteBuffer key()
{
return key_;
}
@@ -87,7 +88,7 @@ public class WriteResponse
public WriteResponse deserialize(DataInputStream dis) throws IOException
{
String table = dis.readUTF();
- byte[] key = FBUtilities.readShortByteArray(dis);
+ ByteBuffer key = FBUtilities.readShortByteArray(dis);
boolean status = dis.readBoolean();
return new WriteResponse(table, key, status);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java Fri Oct 22 03:23:26 2010
@@ -21,10 +21,9 @@ package org.apache.cassandra.db.columnit
*/
-import org.apache.commons.lang.ArrayUtils;
-
import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.utils.FBUtilities;
public class IdentityQueryFilter extends SliceQueryFilter
{
@@ -33,7 +32,7 @@ public class IdentityQueryFilter extends
*/
public IdentityQueryFilter()
{
- super(ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, false, Integer.MAX_VALUE);
+ super(FBUtilities.EMPTY_BYTE_BUFFER, FBUtilities.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
}
public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java Fri Oct 22 03:23:26 2010
@@ -23,22 +23,22 @@ package org.apache.cassandra.db.columnit
import java.io.IOError;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
-import com.google.common.collect.AbstractIterator;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileMark;
+import com.google.common.collect.AbstractIterator;
+
/**
* This is a reader that finds the block for a starting column and returns
* blocks before/after it for each next call. This function assumes that
@@ -50,15 +50,15 @@ class IndexedSliceReader extends Abstrac
private final List<IndexHelper.IndexInfo> indexes;
private final FileDataInput file;
- private final byte[] startColumn;
- private final byte[] finishColumn;
+ private final ByteBuffer startColumn;
+ private final ByteBuffer finishColumn;
private final boolean reversed;
private BlockFetcher fetcher;
private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>();
private AbstractType comparator;
- public IndexedSliceReader(CFMetaData metadata, FileDataInput input, byte[] startColumn, byte[] finishColumn, boolean reversed)
+ public IndexedSliceReader(CFMetaData metadata, FileDataInput input, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
{
this.file = input;
this.startColumn = startColumn;
@@ -91,15 +91,15 @@ class IndexedSliceReader extends Abstrac
private boolean isColumnNeeded(IColumn column)
{
- if (startColumn.length == 0 && finishColumn.length == 0)
+ if (startColumn.remaining() == 0 && finishColumn.remaining() == 0)
return true;
- else if (startColumn.length == 0 && !reversed)
+ else if (startColumn.remaining() == 0 && !reversed)
return comparator.compare(column.name(), finishColumn) <= 0;
- else if (startColumn.length == 0 && reversed)
+ else if (startColumn.remaining() == 0 && reversed)
return comparator.compare(column.name(), finishColumn) >= 0;
- else if (finishColumn.length == 0 && !reversed)
+ else if (finishColumn.remaining() == 0 && !reversed)
return comparator.compare(column.name(), startColumn) >= 0;
- else if (finishColumn.length == 0 && reversed)
+ else if (finishColumn.remaining() == 0 && reversed)
return comparator.compare(column.name(), startColumn) <= 0;
else if (!reversed)
return comparator.compare(column.name(), startColumn) >= 0 && comparator.compare(column.name(), finishColumn) <= 0;
@@ -160,14 +160,14 @@ class IndexedSliceReader extends Abstrac
/* see if this read is really necessary. */
if (reversed)
{
- if ((finishColumn.length > 0 && comparator.compare(finishColumn, curColPosition.lastName) > 0) ||
- (startColumn.length > 0 && comparator.compare(startColumn, curColPosition.firstName) < 0))
+ if ((finishColumn.remaining() > 0 && comparator.compare(finishColumn, curColPosition.lastName) > 0) ||
+ (startColumn.remaining() > 0 && comparator.compare(startColumn, curColPosition.firstName) < 0))
return false;
}
else
{
- if ((startColumn.length > 0 && comparator.compare(startColumn, curColPosition.lastName) > 0) ||
- (finishColumn.length > 0 && comparator.compare(finishColumn, curColPosition.firstName) < 0))
+ if ((startColumn.remaining() > 0 && comparator.compare(startColumn, curColPosition.lastName) > 0) ||
+ (finishColumn.remaining() > 0 && comparator.compare(finishColumn, curColPosition.firstName) < 0))
return false;
}
@@ -184,9 +184,9 @@ class IndexedSliceReader extends Abstrac
blockColumns.addLast(column);
/* see if we can stop seeking. */
- if (!reversed && finishColumn.length > 0)
+ if (!reversed && finishColumn.remaining() > 0)
outOfBounds = comparator.compare(column.name(), finishColumn) >= 0;
- else if (reversed && startColumn.length > 0)
+ else if (reversed && startColumn.remaining() > 0)
outOfBounds = comparator.compare(column.name(), startColumn) >= 0;
}
@@ -213,9 +213,9 @@ class IndexedSliceReader extends Abstrac
/* see if we can stop seeking. */
boolean outOfBounds = false;
- if (!reversed && finishColumn.length > 0)
+ if (!reversed && finishColumn.remaining() > 0)
outOfBounds = comparator.compare(column.name(), finishColumn) >= 0;
- else if (reversed && startColumn.length > 0)
+ else if (reversed && startColumn.remaining() > 0)
outOfBounds = comparator.compare(column.name(), startColumn) >= 0;
if (outOfBounds)
break;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java Fri Oct 22 03:23:26 2010
@@ -22,10 +22,12 @@ package org.apache.cassandra.db.columnit
import java.io.IOError;
import java.io.IOException;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -39,6 +41,8 @@ import org.apache.cassandra.io.util.File
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SSTableNamesIterator extends SimpleAbstractColumnIterator implements IColumnIterator
{
@@ -46,10 +50,10 @@ public class SSTableNamesIterator extend
private ColumnFamily cf;
private Iterator<IColumn> iter;
- public final SortedSet<byte[]> columns;
+ public final SortedSet<ByteBuffer> columns;
public final DecoratedKey key;
- public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<byte[]> columns)
+ public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<ByteBuffer> columns)
{
assert columns != null;
this.columns = columns;
@@ -85,7 +89,7 @@ public class SSTableNamesIterator extend
}
}
- public SSTableNamesIterator(CFMetaData metadata, FileDataInput file, DecoratedKey key, SortedSet<byte[]> columns)
+ public SSTableNamesIterator(CFMetaData metadata, FileDataInput file, DecoratedKey key, SortedSet<ByteBuffer> columns)
{
assert columns != null;
this.columns = columns;
@@ -114,8 +118,8 @@ public class SSTableNamesIterator extend
// we can't stop before initializing the cf above, in case there's a relevant tombstone
cf = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), file);
- List<byte[]> filteredColumnNames = new ArrayList<byte[]>(columns.size());
- for (byte[] name : columns)
+ List<ByteBuffer> filteredColumnNames = new ArrayList<ByteBuffer>(columns.size());
+ for (ByteBuffer name : columns)
{
if (bf.isPresent(name))
{
@@ -134,7 +138,7 @@ public class SSTableNamesIterator extend
iter = cf.getSortedColumns().iterator();
}
- private void readSimpleColumns(FileDataInput file, SortedSet<byte[]> columnNames, List<byte[]> filteredColumnNames) throws IOException
+ private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames) throws IOException
{
int columns = file.readInt();
int n = 0;
@@ -150,7 +154,7 @@ public class SSTableNamesIterator extend
}
}
- private void readIndexedColumns(CFMetaData metadata, FileDataInput file, SortedSet<byte[]> columnNames, List<byte[]> filteredColumnNames, List<IndexHelper.IndexInfo> indexList)
+ private void readIndexedColumns(CFMetaData metadata, FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames, List<IndexHelper.IndexInfo> indexList)
throws IOException
{
file.readInt(); // column count
@@ -158,7 +162,7 @@ public class SSTableNamesIterator extend
/* get the various column ranges we have to read */
AbstractType comparator = metadata.comparator;
SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator));
- for (byte[] name : filteredColumnNames)
+ for (ByteBuffer name : filteredColumnNames)
{
int index = IndexHelper.indexFor(name, indexList, comparator, false);
if (index == indexList.size())
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java Fri Oct 22 03:23:26 2010
@@ -23,15 +23,15 @@ package org.apache.cassandra.db.columnit
import java.io.IOError;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
-
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -43,7 +43,7 @@ public class SSTableSliceIterator implem
private IColumnIterator reader;
private DecoratedKey key;
- public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, byte[] startColumn, byte[] finishColumn, boolean reversed)
+ public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
{
this.key = key;
fileToClose = sstable.getFileDataInput(this.key, DatabaseDescriptor.getSlicedReadBufferSizeInKB() * 1024);
@@ -79,16 +79,16 @@ public class SSTableSliceIterator implem
* @param finishColumn The end of the slice
* @param reversed Results are returned in reverse order iff reversed is true.
*/
- public SSTableSliceIterator(CFMetaData metadata, FileDataInput file, DecoratedKey key, byte[] startColumn, byte[] finishColumn, boolean reversed)
+ public SSTableSliceIterator(CFMetaData metadata, FileDataInput file, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
{
this.key = key;
fileToClose = null;
reader = createReader(metadata, file, startColumn, finishColumn, reversed);
}
- private static IColumnIterator createReader(CFMetaData metadata, FileDataInput file, byte[] startColumn, byte[] finishColumn, boolean reversed)
+ private static IColumnIterator createReader(CFMetaData metadata, FileDataInput file, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
{
- return startColumn.length == 0 && !reversed
+ return startColumn.remaining() == 0 && !reversed
? new SimpleSliceReader(metadata, file, finishColumn)
: new IndexedSliceReader(metadata, file, startColumn, finishColumn, reversed);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java Fri Oct 22 03:23:26 2010
@@ -23,6 +23,7 @@ package org.apache.cassandra.db.columnit
import java.io.IOError;
import java.io.IOException;
+import java.nio.ByteBuffer;
import com.google.common.collect.AbstractIterator;
@@ -39,14 +40,14 @@ import org.apache.cassandra.io.util.File
class SimpleSliceReader extends AbstractIterator<IColumn> implements IColumnIterator
{
private final FileDataInput file;
- private final byte[] finishColumn;
+ private final ByteBuffer finishColumn;
private final AbstractType comparator;
private final ColumnFamily emptyColumnFamily;
private final int columns;
private int i;
private FileMark mark;
- public SimpleSliceReader(CFMetaData metadata, FileDataInput input, byte[] finishColumn)
+ public SimpleSliceReader(CFMetaData metadata, FileDataInput input, ByteBuffer finishColumn)
{
this.file = input;
this.finishColumn = finishColumn;
@@ -81,7 +82,7 @@ class SimpleSliceReader extends Abstract
{
throw new RuntimeException("error reading " + i + " of " + columns, e);
}
- if (finishColumn.length > 0 && comparator.compare(column.name(), finishColumn) > 0)
+ if (finishColumn.remaining() > 0 && comparator.compare(column.name(), finishColumn) > 0)
return endOfData();
mark = file.mark();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java Fri Oct 22 03:23:26 2010
@@ -21,27 +21,35 @@ package org.apache.cassandra.db.filter;
*/
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.SortedSet;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.IColumnContainer;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SSTableNamesIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.FBUtilities;
public class NamesQueryFilter implements IFilter
{
- public final SortedSet<byte[]> columns;
+ public final SortedSet<ByteBuffer> columns;
- public NamesQueryFilter(SortedSet<byte[]> columns)
+ public NamesQueryFilter(SortedSet<ByteBuffer> columns)
{
this.columns = columns;
}
- public NamesQueryFilter(byte[] column)
+ public NamesQueryFilter(ByteBuffer column)
{
this(FBUtilities.getSingleColumnSet(column));
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1026200&r1=1026199&r2=1026200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Fri Oct 22 03:23:26 2010
@@ -21,17 +21,28 @@ package org.apache.cassandra.db.filter;
*/
-import java.util.*;
-
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.IColumnContainer;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ReducingIterator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +95,7 @@ public class QueryFilter
}
// here so it can be used by SQF and NQF. non-package callers should call IFilter.getColumnComparator
- static Comparator<IColumn> getColumnComparator(final Comparator<byte[]> comparator)
+ static Comparator<IColumn> getColumnComparator(final Comparator<ByteBuffer> comparator)
{
return new Comparator<IColumn>()
{
@@ -105,7 +116,7 @@ public class QueryFilter
protected boolean isEqual(IColumn o1, IColumn o2)
{
- return Arrays.equals(o1.name(), o2.name());
+ return ByteBufferUtil.equals(o1.name(), o2.name());
}
public void reduce(IColumn current)
@@ -128,7 +139,8 @@ public class QueryFilter
c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
}
- curCF.clear();
+ curCF.clear();
+
return c;
}
};
@@ -159,7 +171,7 @@ public class QueryFilter
* @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
* @param limit maximum number of non-deleted columns to return
*/
- public static QueryFilter getSliceFilter(DecoratedKey key, QueryPath path, byte[] start, byte[] finish, boolean reversed, int limit)
+ public static QueryFilter getSliceFilter(DecoratedKey key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
{
return new QueryFilter(key, path, new SliceQueryFilter(start, finish, reversed, limit));
}
@@ -179,7 +191,7 @@ public class QueryFilter
* @param path path to the level to slice at (CF or SuperColumn)
* @param columns the column names to restrict the results to
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, SortedSet<byte[]> columns)
+ public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, SortedSet<ByteBuffer> columns)
{
return new QueryFilter(key, path, new NamesQueryFilter(columns));
}
@@ -188,7 +200,7 @@ public class QueryFilter
{
if (predicate.column_names != null)
{
- final SortedSet<byte[]> columnNameSet = new TreeSet<byte[]>(comparator);
+ final SortedSet<ByteBuffer> columnNameSet = new TreeSet<ByteBuffer>(comparator);
columnNameSet.addAll(predicate.column_names);
return new NamesQueryFilter(columnNameSet);
}
@@ -200,7 +212,7 @@ public class QueryFilter
/**
* convenience method for creating a name filter matching a single column
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, byte[] column)
+ public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, ByteBuffer column)
{
return new QueryFilter(key, path, new NamesQueryFilter(column));
}