You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:39:42 UTC
svn commit: r759026 [3/3] -
/incubator/cassandra/trunk/src/org/apache/cassandra/db/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Fri Mar 27 05:39:40 2009
@@ -18,19 +18,21 @@
package org.apache.cassandra.db;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -39,17 +41,22 @@
public final class SuperColumn implements IColumn, Serializable
{
private static Logger logger_ = Logger.getLogger(SuperColumn.class);
- private static SuperColumnSerializer serializer_ = new SuperColumnSerializer();
+ private static ICompactSerializer2<IColumn> serializer_;
private final static String seperator_ = ":";
- static SuperColumnSerializer serializer()
+ static
+ {
+ serializer_ = new SuperColumnSerializer();
+ }
+
+ static ICompactSerializer2<IColumn> serializer()
{
return serializer_;
}
private String name_;
private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
- private long markedForDeleteAt = Long.MIN_VALUE;
+ private AtomicBoolean isMarkedForDelete_ = new AtomicBoolean(false);
private AtomicInteger size_ = new AtomicInteger(0);
SuperColumn()
@@ -63,7 +70,7 @@
public boolean isMarkedForDelete()
{
- return markedForDeleteAt > Long.MIN_VALUE;
+ return isMarkedForDelete_.get();
}
public String name()
@@ -76,11 +83,12 @@
return columns_.getSortedColumns();
}
- public IColumn getSubColumn(String columnName)
+ public IColumn getSubColumn( String columnName )
{
- IColumn column = columns_.get(columnName);
- assert column instanceof Column;
- return column;
+ IColumn column = columns_.get(columnName);
+ if ( column instanceof SuperColumn )
+ throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+ return column;
}
public int compareTo(IColumn superColumn)
@@ -140,7 +148,7 @@
return size;
}
- public void remove(String columnName)
+ protected void remove(String columnName)
{
columns_.remove(columnName);
}
@@ -168,7 +176,8 @@
public byte[] value(String key)
{
IColumn column = columns_.get(key);
- assert column instanceof Column;
+ if ( column instanceof SuperColumn )
+ throw new UnsupportedOperationException("A super column cannot hold other super columns.");
if ( column != null )
return column.value();
throw new IllegalArgumentException("Value was requested for a column that does not exist.");
@@ -202,18 +211,19 @@
* Go through each sub column if it exists then as it to resolve itself
* if the column does not exist then create it.
*/
- public void putColumn(IColumn column)
+ public boolean putColumn(IColumn column)
{
if ( !(column instanceof SuperColumn))
throw new UnsupportedOperationException("Only Super column objects should be put here");
if( !name_.equals(column.name()))
throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ Collection<IColumn> columns = column.getSubColumns();
- for (IColumn subColumn : column.getSubColumns())
+ for ( IColumn subColumn : columns )
{
- addColumn(subColumn.name(), subColumn);
+ addColumn(subColumn.name(), subColumn);
}
- markedForDeleteAt = Math.max(markedForDeleteAt, column.getMarkedForDeleteAt());
+ return false;
}
public int getObjectCount()
@@ -221,8 +231,10 @@
return 1 + columns_.size();
}
- public long getMarkedForDeleteAt() {
- return markedForDeleteAt;
+ public void delete()
+ {
+ columns_.clear();
+ isMarkedForDelete_.set(true);
}
int getColumnCount()
@@ -230,6 +242,21 @@
return columns_.size();
}
+ public void repair(IColumn column)
+ {
+ Collection<IColumn> columns = column.getSubColumns();
+
+ for ( IColumn subColumn : columns )
+ {
+ IColumn columnInternal = columns_.get(subColumn.name());
+ if( columnInternal == null )
+ columns_.put(subColumn.name(), subColumn);
+ else
+ columnInternal.repair(subColumn);
+ }
+ }
+
+
public IColumn diff(IColumn column)
{
IColumn columnDiff = new SuperColumn(column.name());
@@ -260,7 +287,7 @@
public byte[] digest()
{
Set<IColumn> columns = columns_.getSortedColumns();
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
+ byte[] xorHash = new byte[0];
if(name_ == null)
return xorHash;
xorHash = name_.getBytes();
@@ -275,23 +302,23 @@
public String toString()
{
StringBuilder sb = new StringBuilder();
- sb.append("SuperColumn(");
sb.append(name_);
-
- if (isMarkedForDelete()) {
- sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+
+ Collection<IColumn> columns = getSubColumns();
+ sb.append(columns.size());
+ sb.append(":");
+ sb.append(size());
+ sb.append(":");
+ for ( IColumn subColumn : columns )
+ {
+ sb.append(subColumn.toString());
}
-
- sb.append(" [");
- sb.append(StringUtils.join(getSubColumns(), ", "));
- sb.append("])");
-
+ sb.append(":");
return sb.toString();
}
-
- public void markForDeleteAt(long timestamp) {
- this.markedForDeleteAt = timestamp;
- }
}
class SuperColumnSerializer implements ICompactSerializer2<IColumn>
@@ -300,7 +327,7 @@
{
SuperColumn superColumn = (SuperColumn)column;
dos.writeUTF(superColumn.name());
- dos.writeLong(superColumn.getMarkedForDeleteAt());
+ dos.writeBoolean(superColumn.isMarkedForDelete());
Collection<IColumn> columns = column.getSubColumns();
int size = columns.size();
@@ -327,15 +354,18 @@
private SuperColumn defreezeSuperColumn(DataInputStream dis) throws IOException
{
String name = dis.readUTF();
+ boolean delete = dis.readBoolean();
SuperColumn superColumn = new SuperColumn(name);
- superColumn.markForDeleteAt(dis.readLong());
+ if ( delete )
+ superColumn.delete();
return superColumn;
}
public IColumn deserialize(DataInputStream dis) throws IOException
{
SuperColumn superColumn = defreezeSuperColumn(dis);
- fillSuperColumn(superColumn, dis);
+ if ( !superColumn.isMarkedForDelete() )
+ fillSuperColumn(superColumn, dis);
return superColumn;
}
@@ -348,11 +378,12 @@
int size = dis.readInt();
dis.skip(size);
}
-
+
private void fillSuperColumn(IColumn superColumn, DataInputStream dis) throws IOException
{
- assert dis.available() != 0;
-
+ if ( dis.available() == 0 )
+ return;
+
/* read the number of columns */
int size = dis.readInt();
/* read the size of all columns */
@@ -368,12 +399,13 @@
{
if ( dis.available() == 0 )
return null;
-
+
IColumn superColumn = defreezeSuperColumn(dis);
superColumn = filter.filter(superColumn, dis);
if(superColumn != null)
{
- fillSuperColumn(superColumn, dis);
+ if ( !superColumn.isMarkedForDelete() )
+ fillSuperColumn(superColumn, dis);
return superColumn;
}
else
@@ -395,29 +427,32 @@
{
if ( dis.available() == 0 )
return null;
-
+
String[] names = RowMutation.getColumnAndColumnFamily(name);
if ( names.length == 1 )
{
IColumn superColumn = defreezeSuperColumn(dis);
if(name.equals(superColumn.name()))
{
- /* read the number of columns stored */
- int size = dis.readInt();
- /* read the size of all columns */
- dis.readInt();
- IColumn column = null;
- for ( int i = 0; i < size; ++i )
+ if ( !superColumn.isMarkedForDelete() )
{
- column = Column.serializer().deserialize(dis, filter);
- if(column != null)
+ /* read the number of columns stored */
+ int size = dis.readInt();
+ /* read the size of all columns */
+ dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
{
- superColumn.addColumn(column.name(), column);
- column = null;
- if(filter.isDone())
- {
- break;
- }
+ column = Column.serializer().deserialize(dis, filter);
+ if(column != null)
+ {
+ superColumn.addColumn(column.name(), column);
+ column = null;
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
}
}
return superColumn;
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java Fri Mar 27 05:39:40 2009
@@ -18,13 +18,25 @@
package org.apache.cassandra.db;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
-import org.apache.log4j.Logger;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
@@ -32,8 +44,9 @@
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -47,7 +60,7 @@
/* Name of the SystemTable */
public static final String name_ = "System";
/* Name of the only column family in the Table */
- public static final String cfName_ = "LocationInfo";
+ static final String cfName_ = "LocationInfo";
/* Name of columns in this table */
static final String generation_ = "Generation";
static final String token_ = "Token";
@@ -150,20 +163,19 @@
* This method is used to update the SystemTable with the
* new token.
*/
- public void updateToken(Token token) throws IOException
+ public void updateToken(BigInteger token) throws IOException
{
- IPartitioner p = StorageService.getPartitioner();
if ( systemRow_ != null )
{
- Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilies();
/* Retrieve the "LocationInfo" column family */
ColumnFamily columnFamily = columnFamilies.get(SystemTable.cfName_);
long oldTokenColumnTimestamp = columnFamily.getColumn(SystemTable.token_).timestamp();
/* create the "Token" whose value is the new token. */
- IColumn tokenColumn = new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
+ IColumn tokenColumn = new Column(SystemTable.token_, token.toByteArray(), oldTokenColumnTimestamp + 1);
/* replace the old "Token" column with this new one. */
- logger_.debug("Replacing old token " + p.getTokenFactory().fromByteArray(columnFamily.getColumn(SystemTable.token_).value()) + " with " + token);
- columnFamily.addColumn(tokenColumn);
+ logger_.debug("Replacing old token " + new BigInteger( columnFamily.getColumn(SystemTable.token_).value() ).toString() + " with token " + token.toString());
+ columnFamily.addColumn(SystemTable.token_, tokenColumn);
reset(systemRow_);
}
}
@@ -183,7 +195,17 @@
{
LogUtil.init();
StorageService.instance().start();
- SystemTable.openSystemTable(SystemTable.cfName_).updateToken(StorageService.token("503545744:0"));
+ SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
System.out.println("Done");
+
+ /*
+ BigInteger hash = StorageService.hash("304700067:0");
+ List<Range> ranges = new ArrayList<Range>();
+ ranges.add( new Range(new BigInteger("1218069462158869448693347920504606362273788442553"), new BigInteger("1092770595533781724218060956188429069")) );
+ if ( Range.isKeyInRanges(ranges, "304700067:0") )
+ {
+ System.out.println("Done");
+ }
+ */
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 05:39:40 2009
@@ -19,14 +19,18 @@
package org.apache.cassandra.db;
import java.util.*;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.analytics.DBAnalyticsSource;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.dht.BootstrapInitiateMessage;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.DataInputBuffer;
@@ -48,6 +52,9 @@
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.service.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -60,7 +67,7 @@
* is basically the column family name and the ID associated with
* this column family. We use this ID in the Commit Log header to
* determine when a log file that has been rolled can be deleted.
- */
+ */
public static class TableMetadata
{
/* Name of the column family */
@@ -153,7 +160,7 @@
cfTypeMap_.put(cf, type);
}
- public boolean isEmpty()
+ boolean isEmpty()
{
return cfIdMap_.isEmpty();
}
@@ -193,7 +200,7 @@
return cfIdMap_.containsKey(cfName);
}
- public void apply() throws IOException
+ void apply() throws IOException
{
String table = DatabaseDescriptor.getTables().get(0);
DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -454,7 +461,7 @@
return columnFamilyStores_;
}
- public ColumnFamilyStore getColumnFamilyStore(String cfName)
+ ColumnFamilyStore getColumnFamilyStore(String cfName)
{
return columnFamilyStores_.get(cfName);
}
@@ -488,7 +495,7 @@
for ( String cfName : cfNames )
{
ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
- sb.append(cfStore.cfStats(newLineSeparator));
+ sb.append(cfStore.cfStats(newLineSeparator, df));
}
int newLength = sb.toString().length();
@@ -592,7 +599,7 @@
{
ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
if ( cfStore != null )
- MinorCompactionManager.instance().submitMajor(cfStore, 0);
+ MinorCompactionManager.instance().submitMajor(cfStore, null, 0);
}
}
@@ -684,6 +691,26 @@
dbAnalyticsSource_.updateReadStatistics(timeTaken);
return row;
}
+
+ public Row getRowFromMemory(String key)
+ {
+ Row row = new Row(key);
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ long start = System.currentTimeMillis();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
+ if ( cfStore != null )
+ {
+ ColumnFamily cf = cfStore.getColumnFamilyFromMemory(key, columnFamily, new IdentityFilter());
+ if ( cf != null )
+ row.addColumnFamily(cf);
+ }
+ }
+ long timeTaken = System.currentTimeMillis() - start;
+ dbAnalyticsSource_.updateReadStatistics(timeTaken);
+ return row;
+ }
/**
@@ -788,18 +815,22 @@
* Once this happens the data associated with the individual column families
* is also written to the column family store's memtable.
*/
- void apply(Row row) throws IOException
- {
+ public void apply(Row row) throws IOException
+ {
+ String key = row.key();
/* Add row to the commit log. */
long start = System.currentTimeMillis();
+
CommitLog.CommitLogContext cLogCtx = CommitLog.open(table_).add(row);
-
- for (ColumnFamily columnFamily : row.getColumnFamilies())
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
{
+ ColumnFamily columnFamily = columnFamilies.get(cName);
ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
- cfStore.apply(row.key(), columnFamily, cLogCtx);
+ cfStore.apply( key, columnFamily, cLogCtx);
}
-
+ row.clear();
long timeTaken = System.currentTimeMillis() - start;
dbAnalyticsSource_.updateWriteStatistics(timeTaken);
}
@@ -807,7 +838,7 @@
void applyNow(Row row) throws IOException
{
String key = row.key();
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
Set<String> cNames = columnFamilies.keySet();
for ( String cName : cNames )
@@ -823,11 +854,24 @@
Set<String> cfNames = columnFamilyStores_.keySet();
for ( String cfName : cfNames )
{
- if (fRecovery) {
- columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
- } else {
- columnFamilyStores_.get(cfName).forceFlush();
- }
+ columnFamilyStores_.get(cfName).forceFlush(fRecovery);
+ }
+ }
+
+ void delete(Row row) throws IOException
+ {
+ String key = row.key();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+
+ /* Add row to commit log */
+ CommitLog.open(table_).add(row);
+ Set<String> cNames = columnFamilies.keySet();
+
+ for ( String cName : cNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+ cfStore.delete( key, columnFamily );
}
}
@@ -837,7 +881,7 @@
/* Add row to the commit log. */
long start = System.currentTimeMillis();
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
Set<String> cNames = columnFamilies.keySet();
for ( String cName : cNames )
{
@@ -858,7 +902,7 @@
}
else if(column.timestamp() == 3)
{
- cfStore.forceFlush();
+ cfStore.forceFlush(false);
}
else if(column.timestamp() == 4)
{
@@ -876,16 +920,12 @@
dbAnalyticsSource_.updateWriteStatistics(timeTaken);
}
- public Set<String> getApplicationColumnFamilies()
+ public static void main(String[] args) throws Throwable
{
- Set<String> set = new HashSet<String>();
- for (String cfName : getColumnFamilies())
- {
- if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
- {
- set.add(cfName);
- }
- }
- return set;
+ StorageService service = StorageService.instance();
+ service.start();
+ Table table = Table.open("Mailbox");
+ Row row = table.get("35300190:1");
+ System.out.println( row.key() );
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java Fri Mar 27 05:39:40 2009
@@ -47,10 +47,7 @@
public ColumnFamily filter(String cf, ColumnFamily columnFamily)
{
- if (columnFamily == null)
- return columnFamily;
-
- String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
String cfName = columnFamily.name();
ColumnFamily filteredCf = new ColumnFamily(cfName);
if( values.length == 1 && !DatabaseDescriptor.getColumnType(cfName).equals("Super"))
@@ -61,7 +58,7 @@
{
if ( column.timestamp() >= timeLimit_ )
{
- filteredCf.addColumn(column);
+ filteredCf.addColumn(column.name(), column);
++i;
}
else
@@ -85,8 +82,8 @@
for(IColumn column : columns)
{
SuperColumn superColumn = (SuperColumn)column;
- SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
- filteredCf.addColumn(filteredSuperColumn);
+ SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
Collection<IColumn> subColumns = superColumn.getSubColumns();
int i = 0;
for(IColumn subColumn : subColumns)
@@ -146,6 +143,6 @@
public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
{
- return ssTable.next( key, cf, null, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
+ return ssTable.next( key, cf, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
}
}