You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/22 23:17:38 UTC

svn commit: r787403 [1/2] - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/loader/ src/java/org/apache/cassandra/net/http/ src/java/...

Author: jbellis
Date: Mon Jun 22 21:17:36 2009
New Revision: 787403

URL: http://svn.apache.org/viewvc?rev=787403&view=rev
Log:
Pass and write table parameter as needed throughout the codebase to add multitable support.
(Many places cheated and just assumed that tables.get(0) was the only table.)
Patch builds but does not yet pass tests.

Patch by goffinet; reviewed by jbellis for CASSANDRA-79

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnComparatorFactory.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/Loader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/PreLoad.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RangeFilterTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jun 22 21:17:36 2009
@@ -459,16 +459,16 @@
 
         for ( String table : tables )
         {
-            Table.TableMetadata tmetadata = Table.TableMetadata.instance();
+            Table.TableMetadata tmetadata = Table.TableMetadata.instance(table);
             if (tmetadata.isEmpty())
             {
-                tmetadata = Table.TableMetadata.instance();
+                tmetadata = Table.TableMetadata.instance(table);
                 /* Column families associated with this table */
                 Map<String, CFMetaData> columnFamilies = tableToCFMetaDataMap_.get(table);
 
                 for (String columnFamily : columnFamilies.keySet())
                 {
-                    tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+                    tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(table, columnFamily));
                 }
 
                 /*
@@ -580,9 +580,10 @@
     	return sb.toString();
     }
     
-    public static Map<String, CFMetaData> getTableMetaData(String table)
+    public static Map<String, CFMetaData> getTableMetaData(String tableName)
     {
-        return tableToCFMetaDataMap_.get(table);
+        assert tableName != null;
+        return tableToCFMetaDataMap_.get(tableName);
     }
 
     /*
@@ -590,19 +591,20 @@
      * meta data. If the table name or column family name is not valid
      * this function returns null.
      */
-    public static CFMetaData getCFMetaData(String table, String cfName)
+    public static CFMetaData getCFMetaData(String tableName, String cfName)
     {
-        Map<String, CFMetaData> cfInfo = tableToCFMetaDataMap_.get(table);
+        assert tableName != null;
+        Map<String, CFMetaData> cfInfo = tableToCFMetaDataMap_.get(tableName);
         if (cfInfo == null)
             return null;
         
         return cfInfo.get(cfName);
     }
     
-    public static String getColumnType(String cfName)
+    public static String getColumnType(String tableName, String cfName)
     {
-        String table = getTables().get(0);
-        CFMetaData cfMetaData = getCFMetaData(table, cfName);
+        assert tableName != null;
+        CFMetaData cfMetaData = getCFMetaData(tableName, cfName);
         
         if (cfMetaData == null)
             return null;
@@ -611,6 +613,7 @@
 
     public static int getFlushPeriod(String tableName, String columnFamilyName)
     {
+        assert tableName != null;
         CFMetaData cfMetaData = getCFMetaData(tableName, columnFamilyName);
         
         if (cfMetaData == null)
@@ -618,10 +621,10 @@
         return cfMetaData.flushPeriodInMinutes;
     }
 
-    public static boolean isNameSortingEnabled(String cfName)
+    public static boolean isNameSortingEnabled(String tableName, String cfName)
     {
-        String table = getTables().get(0);
-        CFMetaData cfMetaData = getCFMetaData(table, cfName);
+        assert tableName != null;
+        CFMetaData cfMetaData = getCFMetaData(tableName, cfName);
 
         if (cfMetaData == null)
             return false;
@@ -629,23 +632,29 @@
     	return "Name".equals(cfMetaData.indexProperty_);
     }
     
-    public static boolean isTimeSortingEnabled(String cfName)
+    public static boolean isTimeSortingEnabled(String tableName, String cfName)
     {
-        String table = getTables().get(0);
-        CFMetaData cfMetaData = getCFMetaData(table, cfName);
+        assert tableName != null;
+        CFMetaData cfMetaData = getCFMetaData(tableName, cfName);
 
         if (cfMetaData == null)
             return false;
 
         return "Time".equals(cfMetaData.indexProperty_);
     }
-    
 
     public static List<String> getTables()
     {
         return tables_;
     }
 
+    public static String getTable(String tableName)
+    {
+        assert tableName != null;
+        int index = getTables().indexOf(tableName);
+        return index >= 0 ? getTables().get(index) : null;
+    }
+
     public static void  setTables(String table)
     {
         tables_.add(table);
@@ -764,9 +773,10 @@
         return seeds_;
     }
 
-    public static String getColumnFamilyType(String cfName)
+    public static String getColumnFamilyType(String tableName, String cfName)
     {
-        String cfType = getColumnType(cfName);
+        assert tableName != null;
+        String cfType = getColumnType(tableName, cfName);
         if ( cfType == null )
             cfType = "Standard";
     	return cfType;
@@ -806,10 +816,10 @@
         return dataFileDirectory;
     }
     
-    public static TypeInfo getTypeInfo(String cfName)
+    public static TypeInfo getTypeInfo(String tableName, String cfName)
     {
-        String table = DatabaseDescriptor.getTables().get(0);
-        CFMetaData cfMetadata = DatabaseDescriptor.getCFMetaData(table, cfName);
+        assert tableName != null;
+        CFMetaData cfMetadata = DatabaseDescriptor.getCFMetaData(tableName, cfName);
         if ( cfMetadata.indexProperty_.equals("Name") )
         {
             return TypeInfo.STRING;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Mon Jun 22 21:17:36 2009
@@ -145,7 +145,7 @@
          * Use the SSTable to write the contents of the TreeMap
          * to disk.
         */
-        SSTable ssTable = new SSTable(directory, filename, StorageService.getPartitioner());
+        SSTable ssTable = new SSTable(directory, filename, null, StorageService.getPartitioner());
         List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
         Collections.sort(keys);        
         /* Use this BloomFilter to decide if a key exists in a SSTable */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnComparatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnComparatorFactory.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnComparatorFactory.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnComparatorFactory.java Mon Jun 22 21:17:36 2009
@@ -34,8 +34,8 @@
         TIMESTAMP
     }
 
-    private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
-    private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
+    public static final Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
+    public static final Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
 
     public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Mon Jun 22 21:17:36 2009
@@ -51,6 +51,7 @@
     private static Map<String, String> columnTypes_ = new HashMap<String, String>();
     private static Map<String, String> indexTypes_ = new HashMap<String, String>();
     private String type_;
+    private String table_;
 
     static
     {
@@ -91,6 +92,23 @@
         return indexTypes_.get(columnIndexProperty);
     }
 
+    public static ColumnFamily create(String tableName, String cfName)
+    {
+        Comparator<IColumn> comparator;
+        String columnType = DatabaseDescriptor.getColumnFamilyType(tableName, cfName);
+        if ("Super".equals(columnType)
+            || DatabaseDescriptor.isNameSortingEnabled(tableName, cfName))
+        {
+            comparator = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.NAME);
+        }
+        /* if this columnfamily has simple columns, and no index on name sort by timestamp */
+        else
+        {
+            comparator = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+        }
+        return new ColumnFamily(cfName, columnType, comparator);
+    }
+
     private transient AbstractColumnFactory columnFactory_;
 
     private String name_;
@@ -101,66 +119,24 @@
     private AtomicInteger size_ = new AtomicInteger(0);
     private EfficientBidiMap columns_;
 
-    private Comparator<IColumn> columnComparator_;
-
-	private Comparator<IColumn> getColumnComparator(String cfName, String columnType)
-	{
-		if(columnComparator_ == null)
-		{
-			/*
-			 * if this columnfamily has supercolumns or there is an index on the column name,
-			 * then sort by name
-			*/
-			if("Super".equals(columnType) || DatabaseDescriptor.isNameSortingEnabled(cfName))
-			{
-				columnComparator_ = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.NAME);
-			}
-			/* if this columnfamily has simple columns, and no index on name sort by timestamp */
-			else
-			{
-				columnComparator_ = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
-			}
-		}
-
-		return columnComparator_;
-	}
-
-    public ColumnFamily(String cfName, String columnType)
+    public ColumnFamily(String cfName, String columnType, Comparator<IColumn> comparator)
     {
         name_ = cfName;
         type_ = columnType;
-        createColumnFactoryAndColumnSerializer(columnType);
+        columnFactory_ = AbstractColumnFactory.getColumnFactory(columnType);
+        columnSerializer_ = columnFactory_.createColumnSerializer();
+        if(columns_ == null)
+            columns_ = new EfficientBidiMap(comparator);
     }
 
-    void createColumnFactoryAndColumnSerializer(String columnType)
+    public ColumnFamily(String cfName, String columnType, ColumnComparatorFactory.ComparatorType indexType)
     {
-        if ( columnFactory_ == null )
-        {
-            columnFactory_ = AbstractColumnFactory.getColumnFactory(columnType);
-            columnSerializer_ = columnFactory_.createColumnSerializer();
-            if(columns_ == null)
-                columns_ = new EfficientBidiMap(getColumnComparator(name_, columnType));
-        }
-    }
-
-    void createColumnFactoryAndColumnSerializer()
-    {
-    	String columnType = DatabaseDescriptor.getColumnFamilyType(name_);
-        if ( columnType == null )
-        {
-        	List<String> tables = DatabaseDescriptor.getTables();
-        	if ( tables.size() > 0 )
-        	{
-        		String table = tables.get(0);
-        		columnType = Table.open(table).getColumnFamilyType(name_);
-        	}
-        }
-        createColumnFactoryAndColumnSerializer(columnType);
+        this(cfName, columnType, ColumnComparatorFactory.getComparator(indexType));
     }
 
     ColumnFamily cloneMeShallow()
     {
-        ColumnFamily cf = new ColumnFamily(name_, type_);
+        ColumnFamily cf = new ColumnFamily(name_, type_, getComparator());
         cf.markedForDeleteAt = markedForDeleteAt;
         cf.localDeletionTime = localDeletionTime;
         return cf;
@@ -192,7 +168,6 @@
 
     public ICompactSerializer2<IColumn> getColumnSerializer()
     {
-        createColumnFactoryAndColumnSerializer();
     	return columnSerializer_;
     }
 
@@ -320,13 +295,21 @@
         return markedForDeleteAt > Long.MIN_VALUE;
     }
 
+    public String getTable() {
+        return table_;
+    }
+
+    public void setTable_(String table_) {
+        this.table_ = table_;
+    }
+
     /*
      * This function will calculate the difference between 2 column families.
      * The external input is assumed to be a superset of internal.
      */
     ColumnFamily diff(ColumnFamily cfComposite)
     {
-    	ColumnFamily cfDiff = new ColumnFamily(cfComposite.name(), cfComposite.type_);
+    	ColumnFamily cfDiff = new ColumnFamily(cfComposite.name(), cfComposite.type_, getComparator());
         if (cfComposite.getMarkedForDeleteAt() > getMarkedForDeleteAt())
         {
             cfDiff.delete(cfComposite.getLocalDeletionTime(), cfComposite.getMarkedForDeleteAt());
@@ -361,6 +344,18 @@
         	return null;
     }
 
+    private Comparator<IColumn> getComparator()
+    {
+        return columns_.getComparator();
+    }
+
+    private ColumnComparatorFactory.ComparatorType getComparatorType()
+    {
+        return getComparator() == ColumnComparatorFactory.nameComparator_
+               ? ColumnComparatorFactory.ComparatorType.NAME
+               : ColumnComparatorFactory.ComparatorType.TIMESTAMP;
+    }
+
     int size()
     {
         if ( size_.get() == 0 )
@@ -489,6 +484,8 @@
             Collection<IColumn> columns = columnFamily.getAllColumns();
 
             dos.writeUTF(columnFamily.name());
+            dos.writeUTF(columnFamily.type_);
+            dos.writeInt(columnFamily.getComparatorType().ordinal());
             dos.writeInt(columnFamily.localDeletionTime);
             dos.writeLong(columnFamily.markedForDeleteAt);
 
@@ -505,8 +502,9 @@
         */
         private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
         {
-            String name = dis.readUTF();
-            ColumnFamily cf = new ColumnFamily(name, DatabaseDescriptor.getColumnFamilyType(name));
+            ColumnFamily cf = new ColumnFamily(dis.readUTF(),
+                                               dis.readUTF(),
+                                               ColumnComparatorFactory.ComparatorType.values()[dis.readInt()]);
             cf.delete(dis.readInt(), dis.readLong());
             return cf;
         }
@@ -515,11 +513,11 @@
         {
             ColumnFamily cf = defreezeColumnFamily(dis);
             int size = dis.readInt();
-            IColumn column = null;
-            for ( int i = 0; i < size; ++i )
+            IColumn column;
+            for (int i = 0; i < size; ++i)
             {
                 column = cf.getColumnSerializer().deserialize(dis);
-                if(column != null)
+                if (column != null)
                 {
                     cf.addColumn(column);
                 }
@@ -587,5 +585,6 @@
             throw new UnsupportedOperationException("This operation is not yet supported.");
         }
     }
+
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jun 22 21:17:36 2009
@@ -135,7 +135,7 @@
         Collections.sort(indices);
         int value = (indices.size() > 0) ? (indices.get(indices.size() - 1)) : 0;
 
-        ColumnFamilyStore cfs = new ColumnFamilyStore(table, columnFamily, "Super".equals(DatabaseDescriptor.getColumnType(columnFamily)), value);
+        ColumnFamilyStore cfs = new ColumnFamilyStore(table, columnFamily, "Super".equals(DatabaseDescriptor.getColumnType(table, columnFamily)), value);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -188,7 +188,7 @@
         /* There are no files to compact just add to the list of SSTables */
         ssTables_.addAll(filenames);
         /* Load the index files and the Bloom Filters associated with them. */
-        SSTable.onStart(filenames);
+        SSTable.onStart(filenames, table_);
         MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
         if (columnFamily_.equals(Table.hints_))
         {
@@ -577,7 +577,7 @@
 
     private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
     {
-        SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
+        SSTable ssTable = new SSTable(ssTableFile, null, StorageService.getPartitioner());
         DataInputBuffer bufIn;
         bufIn = filter.next(key, cf, ssTable);
         if (bufIn.getLength() == 0)
@@ -1118,12 +1118,16 @@
                 {
                     if (ssTableRange == null)
                     {
+                        String [] temp = null;
+                        String tableName;
+                        temp = rangeFileLocation.split("-");
+                        tableName = temp[0];
                         if (target != null)
                         {
                             rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
                         }
                         FileUtils.createDirectory(rangeFileLocation);
-                        ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
+                        ssTableRange = new SSTable(rangeFileLocation, mergedFileName, tableName, StorageService.getPartitioner());
                     }
                     try
                     {
@@ -1319,7 +1323,7 @@
 
                 if (ssTable == null)
                 {
-                    ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
+                    ssTable = new SSTable(compactionFileLocation, mergedFileName, null, StorageService.getPartitioner());
                 }
                 ssTable.append(lastkey, bufOut);
 
@@ -1539,6 +1543,10 @@
     {
         return readStats_.mean();
     }
+    public String getTableName()
+    {
+        return table_;
+    }
 
     /**
      * get a list of columns starting from a given column, in a specified order
@@ -1604,7 +1612,7 @@
                 comparator = new ReverseComparator(comparator);
             Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
             if (!collated.hasNext())
-                return new ColumnFamily(cfName, DatabaseDescriptor.getColumnFamilyType(cfName));
+                return ColumnFamily.create(table_, cfName);
             ReducingIterator<IColumn> reduced = new ReducingIterator<IColumn>(collated)
             {
                 ColumnFamily curCF = returnCF.cloneMeShallow();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Mon Jun 22 21:17:36 2009
@@ -57,7 +57,7 @@
         dos.write(bufOut.getData(), 0, bufOut.getLength());
 
         /* Do the indexing */
-        TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(columnFamily.name());        
+        TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(columnFamily.getTable(), columnFamily.name());
         doIndexing(typeInfo, columns, dos);        
 	}
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java Mon Jun 22 21:17:36 2009
@@ -58,7 +58,7 @@
     throws IOException
     {
         this.isAscending = isAscending;
-        SSTable ssTable = new SSTable(filename, StorageService.getPartitioner());
+        SSTable ssTable = new SSTable(filename, null, StorageService.getPartitioner());
         reader = ssTable.getColumnGroupReader(key, cfName, startColumn, isAscending);
         this.startColumn = startColumn;
         curColumnIndex = isAscending ? 0 : -1;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jun 22 21:17:36 2009
@@ -272,7 +272,7 @@
         for (File file : clogs)
         {
             // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
-            IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
+            IFileReader reader = SequenceFile.reader(table_, file.getAbsolutePath());
             try
             {
                 CommitLogHeader clHeader = readCommitLogHeader(reader);
@@ -299,7 +299,7 @@
                     /* read the commit log entry */
                     try
                     {                        
-                        Row row = Row.serializer().deserialize(bufIn);
+                        Row row = Row.serializer(table_).deserialize(bufIn);
                         Table table = Table.open(table_);
                         tablesRecovered.add(table);
                         Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
@@ -382,7 +382,7 @@
         {
             /* serialize the row */
             cfBuffer.reset();
-            Row.serializer().serialize(row, cfBuffer);
+            Row.serializer(table_).serialize(row, cfBuffer);
             currentPosition = logWriter_.getCurrentPosition();
             cLogCtx = new CommitLogContext(logFile_, currentPosition);
             /* Update the header */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java Mon Jun 22 21:17:36 2009
@@ -108,8 +108,8 @@
             int generation = 1;
 
             String key = FBUtilities.getHostAddress();
-            row = new Row(key);
-            ColumnFamily cf = new ColumnFamily(SystemTable.cfName_, "Standard");
+            row = new Row(SystemTable.name_, key);
+            ColumnFamily cf = ColumnFamily.create("system", SystemTable.cfName_); // TODO create system table
             cf.addColumn(new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token)));
             cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
             row.addColumnFamily(cf);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Jun 22 21:17:36 2009
@@ -76,7 +76,7 @@
         return instance_;
     }
 
-    private static boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
+    private static boolean sendMessage(String endpointAddress, String tableName, String key) throws DigestMismatchException, TimeoutException, IOException, InvalidRequestException
     {
         EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
         if (!FailureDetector.instance().isAlive(endPoint))
@@ -84,14 +84,14 @@
             return false;
         }
 
-        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        Table table = Table.open(tableName);
         Row row = table.get(key);
-        Row purgedRow = new Row(key);
+        Row purgedRow = new Row(tableName,key);
         for (ColumnFamily cf : row.getColumnFamilies())
         {
             purgedRow.addColumnFamily(ColumnFamilyStore.removeDeleted(cf));
         }
-        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), purgedRow);
+        RowMutation rm = new RowMutation(tableName, purgedRow);
         Message message = rm.makeRowMutationMessage();
         QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
         MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
@@ -99,14 +99,14 @@
         return quorumResponseHandler.get();
     }
 
-    private static void deleteEndPoint(String endpointAddress, String key, long timestamp) throws Exception
+    private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws Exception
     {
-        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+        RowMutation rm = new RowMutation(tableName, key_);
         rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, timestamp);
         rm.apply();
     }
 
-    private static void deleteHintedData(String key) throws Exception
+    private static void deleteHintedData(String tableName, String key) throws Exception
     {
         // delete the row from Application CFs: find the largest timestamp in any of
         // the data columns, and delete the entire CF with that value for the tombstone.
@@ -116,8 +116,8 @@
         // This is sub-optimal but okay, since HH is just an effort to make a recovering
         // node more consistent than it would have been; we can rely on the other
         // consistency mechanisms to finish the job in this corner case.
-        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        RowMutation rm = new RowMutation(tableName, key_);
+        Table table = Table.open(tableName);
         Row row = table.get(key); // not necessary to do removeDeleted here
         Collection<ColumnFamily> cfs = row.getColumnFamilies();
         for (ColumnFamily cf : cfs)
@@ -155,48 +155,51 @@
         // 5. Now force a flush
         // 6. Do major compaction to clean up all deletes etc.
         // 7. I guess we r done
-        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        try
+        for ( String tableName:DatabaseDescriptor.getTables() )
         {
-            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_), Integer.MAX_VALUE);
-            if (hintColumnFamily == null)
+            Table table = Table.open(tableName);
+            try
             {
-                columnFamilyStore.forceFlush();
-                return;
-            }
-            Collection<IColumn> keys = hintColumnFamily.getAllColumns();
-            if (keys == null)
-            {
-                return;
-            }
+                ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_), Integer.MAX_VALUE);
+                if (hintColumnFamily == null)
+                {
+                    columnFamilyStore.forceFlush();
+                    return;
+                }
+                Collection<IColumn> keys = hintColumnFamily.getAllColumns();
+                if (keys == null)
+                {
+                    return;
+                }
 
-            for (IColumn keyColumn : keys)
-            {
-                Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                int deleted = 0;
-                for (IColumn endpoint : endpoints)
+                for (IColumn keyColumn : keys)
                 {
-                    if (sendMessage(endpoint.name(), keyColumn.name()))
+                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                    int deleted = 0;
+                    for (IColumn endpoint : endpoints)
+                    {
+                        if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
+                        {
+                            deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                            deleted++;
+                        }
+                    }
+                    if (deleted == endpoints.size())
                     {
-                        deleteEndPoint(endpoint.name(), keyColumn.name(), keyColumn.timestamp());
-                        deleted++;
+                        deleteHintedData(tableName, keyColumn.name());
                     }
                 }
-                if (deleted == endpoints.size())
-                {
-                    deleteHintedData(keyColumn.name());
-                }
+                columnFamilyStore.forceFlush();
+                columnFamilyStore.forceCompaction(null, null, 0, null);
+            }
+            catch (Exception ex)
+            {
+                logger_.error(ex.getMessage());
+            }
+            finally
+            {
+                logger_.debug("Finished hinted handoff of " + columnFamilyStore.columnFamily_);
             }
-            columnFamilyStore.forceFlush();
-            columnFamilyStore.forceCompaction(null, null, 0, null);
-        }
-        catch (Exception ex)
-        {
-            logger_.error(ex.getMessage());
-        }
-        finally
-        {
-            logger_.debug("Finished hinted handoff of " + columnFamilyStore.columnFamily_);
         }
     }
 
@@ -207,43 +210,46 @@
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recepients if teh endpoint matches send
         // 3. Delete that recepient from the key if write was successful
-        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        try
+        for ( String tableName:DatabaseDescriptor.getTables() )
         {
-            ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
-            if (hintedColumnFamily == null)
-            {
-                return;
-            }
-            Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
-            if (keys == null)
+            Table table = Table.open(tableName);
+            try
             {
-                return;
-            }
+                ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
+                if (hintedColumnFamily == null)
+                {
+                    return;
+                }
+                Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+                if (keys == null)
+                {
+                    return;
+                }
 
-            for (IColumn keyColumn : keys)
-            {
-                Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                for (IColumn endpoint : endpoints)
+                for (IColumn keyColumn : keys)
                 {
-                    if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), keyColumn.name()))
+                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                    for (IColumn endpoint : endpoints)
                     {
-                        deleteEndPoint(endpoint.name(), keyColumn.name(), keyColumn.timestamp());
-                        if (endpoints.size() == 1)
+                        if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
                         {
-                            deleteHintedData(keyColumn.name());
+                            deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                            if (endpoints.size() == 1)
+                            {
+                                deleteHintedData(tableName, keyColumn.name());
+                            }
                         }
                     }
                 }
             }
-        }
-        catch (Exception ex)
-        {
-            logger_.error(ex.getMessage());
-        }
-        finally
-        {
-            logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
+            catch (Exception ex)
+            {
+                logger_.error(ex.getMessage());
+            }
+            finally
+            {
+                logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
+            }            
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Jun 22 21:17:36 2009
@@ -253,7 +253,7 @@
 
         String directory = DatabaseDescriptor.getDataFileLocation();
         String filename = cfStore.getTempFileName();
-        SSTable ssTable = new SSTable(directory, filename, StorageService.getPartitioner());
+        SSTable ssTable = new SSTable(directory, filename, table_, StorageService.getPartitioner());
 
         // sort keys in the order they would be in when decorated
         final IPartitioner partitioner = StorageService.getPartitioner();
@@ -333,14 +333,14 @@
         if (cf != null)
             columnFamily = cf.cloneMeShallow();
         else
-            columnFamily = new ColumnFamily(cfName, DatabaseDescriptor.getColumnFamilyType(cfName));
+            columnFamily = ColumnFamily.create(table_, cfName);
 
         final IColumn columns[] = (cf == null ? columnFamily : cf).getAllColumns().toArray(new IColumn[columnFamily.getAllColumns().size()]);
         // TODO if we are dealing with supercolumns, we need to clone them while we have the read lock since they can be modified later
         if (!isAscending)
             ArrayUtils.reverse(columns);
         IColumn startIColumn;
-        if (DatabaseDescriptor.getColumnFamilyType(cfName).equals("Standard"))
+        if (DatabaseDescriptor.getColumnFamilyType(table_, cfName).equals("Standard"))
             startIColumn = new Column(startColumn);
         else
             startIColumn = new SuperColumn(startColumn);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Mon Jun 22 21:17:36 2009
@@ -114,7 +114,7 @@
         
         if( !rm.isDigestQuery() && rm.row() != null )
         {            
-            Row.serializer().serialize(rm.row(), dos);
+            Row.serializer(rm.table()).serialize(rm.row(), dos);
         }				
 	}
 	
@@ -129,7 +129,7 @@
         Row row = null;
         if ( !isDigest )
         {
-            row = Row.serializer().deserialize(dis);
+            row = Row.serializer(table).deserialize(dis);
         }
 		
 		ReadResponse rmsg = null;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Mon Jun 22 21:17:36 2009
@@ -36,12 +36,21 @@
 
 public class Row
 {
-    private static RowSerializer serializer_ = new RowSerializer();
     private static Logger logger_ = Logger.getLogger(Row.class);
+    private String table_;
 
-    static RowSerializer serializer()
+    public Row(String table_, String key) {
+        this.table_ = table_;
+        this.key_ = key;
+    }
+
+    public String getTable() {
+        return table_;
+    }
+
+    static RowSerializer serializer(String tableName)
     {
-        return serializer_;
+        return new RowSerializer(tableName);
     }
 
     private String key_;
@@ -130,7 +139,7 @@
      */
     public Row diff(Row rowComposite)
     {
-        Row rowDiff = new Row(key_);
+        Row rowDiff = new Row(table_, key_);
 
         for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
         {
@@ -152,7 +161,7 @@
 
     public Row cloneMe()
     {
-        Row row = new Row(key_);
+        Row row = new Row(table_, key_);
         row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
         return row;
     }
@@ -188,6 +197,12 @@
 
 class RowSerializer implements ICompactSerializer<Row>
 {
+    private String table_;
+
+    public RowSerializer(String tableName)
+    {
+        this.table_ = tableName;
+    }
     public void serialize(Row row, DataOutputStream dos) throws IOException
     {
         dos.writeUTF(row.key());
@@ -207,7 +222,7 @@
     public Row deserialize(DataInputStream dis) throws IOException
     {
         String key = dis.readUTF();
-        Row row = new Row(key);
+        Row row = new Row(table_, key);
         int size = dis.readInt();
 
         if (size > 0)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jun 22 21:17:36 2009
@@ -162,7 +162,7 @@
         {
             if ( columnFamily == null )
             {
-            	columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Standard"));
+            	columnFamily = ColumnFamily.create(table_, values[0]);
             }
         	columnFamily.addColumn(values[1], value, timestamp);
         }
@@ -170,7 +170,7 @@
         {
             if ( columnFamily == null )
             {
-            	columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Super"));
+            	columnFamily = ColumnFamily.create(table_, values[0]);
             }
         	columnFamily.addColumn(values[1]+ ":" + values[2], value, timestamp);
         }
@@ -193,7 +193,7 @@
 
         ColumnFamily columnFamily = modifications_.get(cfName);
         if (columnFamily == null)
-            columnFamily = new ColumnFamily(cfName, DatabaseDescriptor.getColumnType(cfName));
+            columnFamily = ColumnFamily.create(table_, cfName);
         if (values.length == 2)
         {
             if (columnFamily.isSuper())
@@ -229,7 +229,7 @@
     */
     public void apply() throws IOException
     {
-        Row row = new Row(key_);
+        Row row = new Row(table_, key_);
         apply(row);
     }
 
@@ -329,7 +329,8 @@
     public String toString()
     {
         return "RowMutation(" +
-               "key='" + key_ + '\'' +
+               "table='" + table_ + '\'' +
+               ", key='" + key_ + '\'' +
                ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
                ')';
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Mon Jun 22 21:17:36 2009
@@ -82,7 +82,7 @@
         table_ = table;
         String systemTable = getFileName();
         writer_ = SequenceFile.writer(systemTable);
-        reader_ = SequenceFile.reader(systemTable);
+        reader_ = SequenceFile.reader(systemTable, table);
     }
 
     private String getFileName()
@@ -106,13 +106,13 @@
              * This buffer contains key and value so we need to strip
              * certain parts
            */
-            // read the key
+            // read the key            
             bufIn.readUTF();
             // read the data length and then deserialize
             bufIn.readInt();
             try
             {
-                systemRow_ = Row.serializer().deserialize(bufIn);
+                systemRow_ = Row.serializer(table_).deserialize(bufIn);
             }
             catch ( IOException e )
             {
@@ -133,7 +133,7 @@
         String file = getFileName();
         long currentPos = writer_.getCurrentPosition();
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        Row.serializer().serialize(row, bufOut);
+        Row.serializer(row.getTable()).serialize(row, bufOut);
         try
         {
             writer_.append(row.key(), bufOut);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Jun 22 21:17:36 2009
@@ -67,6 +67,7 @@
     {
         /* Name of the column family */
         public final static String cfName_ = "TableMetadata";
+        private String table_;
         private static ICompactSerializer<TableMetadata> serializer_;
         static
         {
@@ -77,19 +78,28 @@
         /* Use the following writer/reader to write/read to Metadata table */
         private static IFileWriter writer_;
         private static IFileReader reader_;
-        
-        public static Table.TableMetadata instance() throws IOException
+        private static HashMap<String,TableMetadata> tableMetadataMap_ = new HashMap<String,TableMetadata>();
+
+        private static TableMetadata getTableMetadata(String table)
         {
-            if ( tableMetadata_ == null )
+          return tableMetadataMap_.get(table);
+        }
+
+        public static Table.TableMetadata instance(String table) throws IOException
+        {
+            TableMetadata metadata = getTableMetadata(table);
+            if ( metadata == null )
             {
-                String file = getFileName();
+                String file = getFileName(table);
                 writer_ = SequenceFile.writer(file);        
-                reader_ = SequenceFile.reader(file);
-                Table.TableMetadata.load();
-                if ( tableMetadata_ == null )
-                    tableMetadata_ = new Table.TableMetadata();
+                reader_ = SequenceFile.reader(file, table);
+                Table.TableMetadata.load(table);
+
+                metadata = new Table.TableMetadata();
+                metadata.table_ = table;
+                tableMetadataMap_.put(table,metadata);
             }
-            return tableMetadata_;
+            return metadata;
         }
 
         static ICompactSerializer<TableMetadata> serializer()
@@ -97,9 +107,9 @@
             return serializer_;
         }
         
-        private static void load() throws IOException
+        private static void load(String table) throws IOException
         {            
-            String file = Table.TableMetadata.getFileName();
+            String file = Table.TableMetadata.getFileName(table);
             File f = new File(file);
             if ( f.exists() )
             {
@@ -108,7 +118,7 @@
                 
                 if ( reader_ == null )
                 {
-                    reader_ = SequenceFile.reader(file);
+                    reader_ = SequenceFile.reader(file, table);
                 }
                 
                 while ( !reader_.isEOF() )
@@ -132,9 +142,8 @@
         private Map<String, Integer> cfIdMap_ = new HashMap<String, Integer>();
         private Map<Integer, String> idCfMap_ = new HashMap<Integer, String>();        
         
-        private static String getFileName()
+        private static String getFileName(String table)
         {
-            String table = DatabaseDescriptor.getTables().get(0);
             return DatabaseDescriptor.getMetadataDirectory() + System.getProperty("file.separator") + table + "-Metadata.db";
         }
 
@@ -187,12 +196,11 @@
         
         public void apply() throws IOException
         {
-            String table = DatabaseDescriptor.getTables().get(0);
             DataOutputBuffer bufOut = new DataOutputBuffer();
             Table.TableMetadata.serializer_.serialize(this, bufOut);
             try
             {
-                writer_.append(table, bufOut);
+                writer_.append(table_, bufOut);
             }
             catch ( IOException ex )
             {
@@ -262,13 +270,17 @@
             {
                 File file = new File( streamContext.getTargetFile() );
                 String fileName = file.getName();
+                String [] temp = null;
+                String tableName;
+                temp = fileName.split("-");
+                tableName = temp[0];
                 /*
                  * If the file is a Data File we need to load the indicies associated
                  * with this file. We also need to cache the file name in the SSTables
                  * list of the associated Column Family. Also merge the CBF into the
                  * sampler.
                 */                
-                new SSTable(streamContext.getTargetFile(), StorageService.getPartitioner());
+                new SSTable(streamContext.getTargetFile(), tableName, StorageService.getPartitioner());
                 logger_.debug("Merging the counting bloom filter in the sampler ...");                
                 String[] peices = FBUtilities.strip(fileName, "-");
                 Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(streamContext.getTargetFile());                
@@ -348,17 +360,19 @@
             for ( StreamContextManager.StreamContext streamContext : streamContexts )
             {
                 String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
-                distinctEntries.add(peices[1] + "-" + peices[2]);
+                distinctEntries.add(peices[0] + "-" + peices[1] + "-" + peices[2]);
             }
             
             /* Generate unique file names per entry */
-            Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
-            Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
-            
             for ( String distinctEntry : distinctEntries )
             {
+                String tableName;
                 String[] peices = FBUtilities.strip(distinctEntry, "-");
-                ColumnFamilyStore cfStore = columnFamilyStores.get(peices[0]);
+                tableName = peices[0];
+                Table table = Table.open( tableName );
+                Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
+
+                ColumnFamilyStore cfStore = columnFamilyStores.get(peices[1]);
                 logger_.debug("Generating file name for " + distinctEntry + " ...");
                 fileNames.put(distinctEntry, cfStore.getNextFileName());
             }
@@ -566,7 +580,7 @@
         dbAnalyticsSource_ = new DBAnalyticsSource();
         try
         {
-            tableMetadata_ = Table.TableMetadata.instance();
+            tableMetadata_ = Table.TableMetadata.instance(table);
             Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
             for ( String columnFamily : columnFamilies )
             {
@@ -614,7 +628,7 @@
     */
     public Row get(String key) throws IOException
     {        
-        Row row = new Row(key);
+        Row row = new Row(table_, key);
         Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
         long start = System.currentTimeMillis();
         for ( String columnFamily : columnFamilies )
@@ -654,7 +668,7 @@
     */
     public Row getRow(String key, String cf) throws IOException
     {
-        Row row = new Row(key);
+        Row row = new Row(table_, key);
         ColumnFamily columnFamily = get(key, cf);
         if ( columnFamily != null )
         	row.addColumnFamily(columnFamily);
@@ -666,7 +680,7 @@
     */
     public Row getRow(String key, String cf, int start, int count) throws IOException
     {
-        Row row = new Row(key);
+        Row row = new Row(table_, key);
         String[] values = RowMutation.getColumnAndColumnFamily(cf);
         ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
         long start1 = System.currentTimeMillis();
@@ -693,7 +707,7 @@
     
     public Row getRow(String key, String cf, String startColumn, String endColumn, int count) throws IOException
     {
-        Row row = new Row(key);
+        Row row = new Row(table_, key);
         String[] values = RowMutation.getColumnAndColumnFamily(cf);
         ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
         long start1 = System.currentTimeMillis();
@@ -712,7 +726,7 @@
     
     public Row getRow(String key, String cf, long sinceTimeStamp) throws IOException
     {
-        Row row = new Row(key);
+        Row row = new Row(table_, key);
         String[] values = RowMutation.getColumnAndColumnFamily(cf);
         ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
         long start1 = System.currentTimeMillis();
@@ -735,7 +749,7 @@
     */
     public Row getRow(String key, String cf, List<String> columns) throws IOException
     {
-    	Row row = new Row(key);
+    	Row row = new Row(table_, key);
         String[] values = RowMutation.getColumnAndColumnFamily(cf);
         ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
 
@@ -948,7 +962,7 @@
             // sstables
             for (String filename : cfs.getSSTableFilenames())
             {
-                FileStruct fs = new FileStruct(SequenceFile.reader(filename), StorageService.getPartitioner());
+                FileStruct fs = new FileStruct(SequenceFile.reader(filename, table_), StorageService.getPartitioner());
                 fs.seekTo(startWith);
                 iterators.add(fs);
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Mon Jun 22 21:17:36 2009
@@ -131,12 +131,12 @@
     
     /**
      * Deserialize the index into a structure and return the number of bytes read.
-     * @param in Input from which the serialized form of the index is read
-     * @param columnIndexList the structure which is filled in with the deserialized index
-     * @return number of bytes read from the input
+     * @param tableName
+     *@param in Input from which the serialized form of the index is read
+     * @param columnIndexList the structure which is filled in with the deserialized index   @return number of bytes read from the input
      * @throws IOException
      */
-	static int deserializeIndex(String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
+	static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
 	{
 		/* read only the column index list */
 		int columnIndexSize = in.readInt();
@@ -152,8 +152,8 @@
         DataInputBuffer indexIn = new DataInputBuffer();
         indexIn.reset(indexOut.getData(), indexOut.getLength());
         
-        TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(cfName);
-        if ( DatabaseDescriptor.getColumnFamilyType(cfName).equals("Super") || DatabaseDescriptor.isNameSortingEnabled(cfName) )
+        TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(tableName, cfName);
+        if ( DatabaseDescriptor.getColumnFamilyType(tableName, cfName).equals("Super") || DatabaseDescriptor.isNameSortingEnabled(tableName, cfName) )
         {
             typeInfo = TypeInfo.STRING;
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon Jun 22 21:17:36 2009
@@ -150,13 +150,13 @@
      * associated with these files. Also caches the file handles 
      * associated with these files.
     */
-    public static void onStart(List<String> filenames) throws IOException
+    public static void onStart(List<String> filenames, String tableName) throws IOException
     {
         for (String filename : filenames)
         {
             try
             {
-                new SSTable(filename, StorageService.getPartitioner());
+                new SSTable(filename, tableName, StorageService.getPartitioner());
             }
             catch (IOException ex)
             {
@@ -205,15 +205,17 @@
     private String lastWrittenKey_;
     private IPartitioner partitioner_;
 
+    private String table_;
     /**
      * This ctor basically gets passed in the full path name
      * of the data file associated with this SSTable. Use this
      * ctor to read the data in this file.
      */
-    public SSTable(String dataFileName, IPartitioner partitioner) throws IOException
+    public SSTable(String dataFileName, String tableName, IPartitioner partitioner) throws IOException
     {
         dataFile_ = dataFileName;
         partitioner_ = partitioner;
+        table_ = tableName;
         /*
          * this is to prevent multiple threads from
          * loading the same index files multiple times
@@ -235,11 +237,12 @@
      * This ctor is used for writing data into the SSTable. Use this
      * version for non DB writes to the SSTable.
      */
-    public SSTable(String directory, String filename, IPartitioner partitioner) throws IOException
+    public SSTable(String directory, String filename, String tableName, IPartitioner partitioner) throws IOException
     {
         dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
         partitioner_ = partitioner;
         dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
+        table_ = tableName;
         indexRAF_ = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
     }
 
@@ -474,7 +477,7 @@
         IFileReader dataReader = null;
         try
         {
-            dataReader = SequenceFile.reader(dataFile_);
+            dataReader = SequenceFile.reader(dataFile_, table_);
             String decoratedKey = partitioner_.decorateKey(clientKey);
             long position = getPosition(decoratedKey, dataReader, partitioner_);
 
@@ -557,7 +560,7 @@
     public ColumnGroupReader getColumnGroupReader(String key, String cfName, String startColumn, boolean isAscending) throws IOException
     {
         ColumnGroupReader reader = null;
-        IFileReader dataReader = SequenceFile.reader(dataFile_);
+        IFileReader dataReader = SequenceFile.reader(table_, dataFile_);
 
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Mon Jun 22 21:17:36 2009
@@ -506,7 +506,7 @@
         }
     }
 
-    
+
     /**
      *  This is a reader that finds the block for a starting column and returns
      *  blocks before/after it for each next call. This function assumes that
@@ -585,7 +585,7 @@
                 /* read the index */
                 List<IndexHelper.ColumnIndexInfo> colIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
                 if (hasColumnIndexes)
-                    totalBytesRead += IndexHelper.deserializeIndex(cfName_, file_, colIndexList);
+                    totalBytesRead += IndexHelper.deserializeIndex(null, cfName_, file_, colIndexList);
 
                 /* need to do two things here.
                  * 1. move the file pointer to the beginning of the list of stored columns
@@ -651,10 +651,12 @@
         private static final short utfPrefix_ = 2;
         protected RandomAccessFile file_;
         protected String filename_;
+        private String table_;
 
-        AbstractReader(String filename)
+        AbstractReader(String filename, String tableName)
         {
             filename_ = filename;
+            table_ = tableName;
         }
 
         public String getFileName()
@@ -701,10 +703,10 @@
             /* if we do then deserialize the index */
             if (hasColumnIndexes)
             {
-                if (DatabaseDescriptor.isNameSortingEnabled(cfName) || DatabaseDescriptor.getColumnFamilyType(cfName).equals("Super"))
+                if (DatabaseDescriptor.isNameSortingEnabled(table_, cfName) || DatabaseDescriptor.getColumnFamilyType(table_, cfName).equals("Super"))
                 {
                     /* read the index */
-                    totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
+                    totalBytesRead += IndexHelper.deserializeIndex(table_, cfName, file_, columnIndexList);
                 }
                 else
                 {
@@ -729,10 +731,10 @@
             /* if we do then deserialize the index */
             if (hasColumnIndexes)
             {
-                if (DatabaseDescriptor.isTimeSortingEnabled(cfName))
+                if (DatabaseDescriptor.isTimeSortingEnabled(null, cfName))
                 {
                     /* read the index */
-                    totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
+                    totalBytesRead += IndexHelper.deserializeIndex(table_, cfName, file_, columnIndexList);
                 }
                 else
                 {
@@ -829,7 +831,7 @@
             /* read the column family name */
             String cfName = file_.readUTF();
             dataSize -= (utfPrefix_ + cfName.length());
-            
+
             /* read local deletion time */
             int localDeletionTime = file_.readInt();
             dataSize -=4;
@@ -997,10 +999,10 @@
 
             /*
              * If we have read the bloom filter in the data
-             * file we know we are at the end of the file 
+             * file we know we are at the end of the file
              * and no further key processing is required. So
              * we return -1 indicating we are at the end of
-             * the file. 
+             * the file.
             */
             if (key.equals(SequenceFile.marker_))
                 bytesRead = -1L;
@@ -1010,9 +1012,9 @@
 
     public static class Reader extends AbstractReader
     {
-        Reader(String filename) throws IOException
+        Reader(String filename, String tableName) throws IOException
         {
-            super(filename);
+            super(filename, tableName);
             init(filename);
         }
 
@@ -1075,7 +1077,7 @@
 
         BufferReader(String filename, int size) throws IOException
         {
-            super(filename);
+            super(filename, null);
             size_ = size;
         }
 
@@ -1104,9 +1106,9 @@
         return new FastConcurrentWriter(filename, size);
     }
 
-    public static IFileReader reader(String filename) throws IOException
+    public static IFileReader reader(String filename, String tableName) throws IOException
     {
-        return new Reader(filename);
+        return new Reader(filename, tableName);
     }
 
     public static IFileReader bufferedReader(String filename, int size) throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/Loader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/Loader.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/Loader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/Loader.java Mon Jun 22 21:17:36 2009
@@ -294,46 +294,51 @@
 	
     void preLoad(File rootDirectory) throws Throwable
     {
-        String table = DatabaseDescriptor.getTables().get(0);
-        String cfName = Table.recycleBin_ + ":" + "Keys";
-        /* populate just the keys. */
-        preParse(rootDirectory, table, cfName);
-        /* dump the memtables */
-        Table.open(table).flush(false);
-        /* force a compaction of the files. */
-        Table.open(table).forceCompaction(null,null,null);
-        
-        /*
-         * This is a hack to let everyone finish. Just sleep for
-         * a couple of minutes. 
-        */
-        logger_.info("Taking a nap after forcing a compaction ...");
-        Thread.sleep(Loader.siesta_);
-        
-        /* Figure out the keys in the index file to relocate the node */
-        List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
-        /* Load the indexes into memory */
-        for ( String df : ssTables )
+        List<String> tables = DatabaseDescriptor.getTables();
+        for ( String table:tables )
         {
-        	new SSTable(df, StorageService.getPartitioner());
+            String cfName = Table.recycleBin_ + ":" + "Keys";
+            /* populate just the keys. */
+            preParse(rootDirectory, table, cfName);
+            /* dump the memtables */
+            Table.open(table).flush(false);
+            /* force a compaction of the files. */
+            Table.open(table).forceCompaction(null,null,null);
+
+            /*
+             * This is a hack to let everyone finish. Just sleep for
+             * a couple of minutes.
+            */
+            logger_.info("Taking a nap after forcing a compaction ...");
+            Thread.sleep(Loader.siesta_);
+
+            /* Figure out the keys in the index file to relocate the node */
+            List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+            /* Load the indexes into memory */
+            for ( String df : ssTables )
+            {
+                new SSTable(df, table, StorageService.getPartitioner());
+            }
+            /* We should have only one file since we just compacted. */
+            List<String> indexedKeys = SSTable.getIndexedKeys();
+            storageService_.relocate(indexedKeys.toArray( new String[0]) );
+
+            /*
+             * This is a hack to let everyone relocate and learn about
+             * each other. Just sleep for a couple of minutes.
+            */
+            logger_.info("Taking a nap after relocating ...");
+            Thread.sleep(Loader.siesta_);
+
+            /*
+             * Do the cleanup necessary. Delete all commit logs and
+             * the SSTables and reset the load state in the StorageService.
+            */
+
+            // TODO Hmm need to double check here
+            SSTable.delete(ssTables.get(0));
+            logger_.info("Finished all the requisite clean up ...");
         }
-        /* We should have only one file since we just compacted. */        
-        List<String> indexedKeys = SSTable.getIndexedKeys();        
-        storageService_.relocate(indexedKeys.toArray( new String[0]) );
-        
-        /*
-         * This is a hack to let everyone relocate and learn about
-         * each other. Just sleep for a couple of minutes. 
-        */
-        logger_.info("Taking a nap after relocating ...");
-        Thread.sleep(Loader.siesta_);  
-        
-        /* 
-         * Do the cleanup necessary. Delete all commit logs and
-         * the SSTables and reset the load state in the StorageService. 
-        */
-        SSTable.delete(ssTables.get(0));
-        logger_.info("Finished all the requisite clean up ...");
     }
     
 	void load(String xmlFile) throws Throwable

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/PreLoad.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/PreLoad.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/PreLoad.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/loader/PreLoad.java Mon Jun 22 21:17:36 2009
@@ -77,45 +77,48 @@
     
     void run(String userFile) throws Throwable
     {
-        String table = DatabaseDescriptor.getTables().get(0);
-        String cfName = Table.recycleBin_ + ":" + "Keys";
-        /* populate just the keys. */
-        preParse(userFile, table, cfName);
-        /* dump the memtables */
-        Table.open(table).flush(false);
-        /* force a compaction of the files. */
-        Table.open(table).forceCompaction(null, null,null);
-        
-        /*
-         * This is a hack to let everyone finish. Just sleep for
-         * a couple of minutes. 
-        */
-        logger_.info("Taking a nap after forcing a compaction ...");
-        Thread.sleep(PreLoad.siesta_);
-        
-        /* Figure out the keys in the index file to relocate the node */
-        List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
-        /* Load the indexes into memory */
-        for ( String df : ssTables )
+        List<String> tables = DatabaseDescriptor.getTables();
+        for ( String table:tables )
         {
-        	new SSTable(df, StorageService.getPartitioner());
+            String cfName = Table.recycleBin_ + ":" + "Keys";
+            /* populate just the keys. */
+            preParse(userFile, table, cfName);
+            /* dump the memtables */
+            Table.open(table).flush(false);
+            /* force a compaction of the files. */
+            Table.open(table).forceCompaction(null, null,null);
+
+            /*
+             * This is a hack to let everyone finish. Just sleep for
+             * a couple of minutes.
+            */
+            logger_.info("Taking a nap after forcing a compaction ...");
+            Thread.sleep(PreLoad.siesta_);
+
+            /* Figure out the keys in the index file to relocate the node */
+            List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+            /* Load the indexes into memory */
+            for ( String df : ssTables )
+            {
+                new SSTable(df, table, StorageService.getPartitioner());
+            }
+            /* We should have only one file since we just compacted. */
+            List<String> indexedKeys = SSTable.getIndexedKeys();
+            storageService_.relocate(indexedKeys.toArray( new String[0]) );
+
+            /*
+             * This is a hack to let everyone relocate and learn about
+             * each other. Just sleep for a couple of minutes.
+            */
+            logger_.info("Taking a nap after relocating ...");
+            Thread.sleep(PreLoad.siesta_);
+
+            /*
+             * Do the cleanup necessary. Delete all commit logs and
+             * the SSTables and reset the load state in the StorageService.
+            */
+            SSTable.delete(ssTables.get(0));
         }
-        /* We should have only one file since we just compacted. */        
-        List<String> indexedKeys = SSTable.getIndexedKeys();        
-        storageService_.relocate(indexedKeys.toArray( new String[0]) );
-        
-        /*
-         * This is a hack to let everyone relocate and learn about
-         * each other. Just sleep for a couple of minutes. 
-        */
-        logger_.info("Taking a nap after relocating ...");
-        Thread.sleep(PreLoad.siesta_);  
-        
-        /* 
-         * Do the cleanup necessary. Delete all commit logs and
-         * the SSTables and reset the load state in the StorageService. 
-        */
-        SSTable.delete(ssTables.get(0));
         logger_.info("Finished all the requisite clean up ...");
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java Mon Jun 22 21:17:36 2009
@@ -250,18 +250,23 @@
 
     private void displayDBStatistics(HTMLFormatter formatter, java.text.DecimalFormat df)
     {
-        String tableStats = Table.open( DatabaseDescriptor.getTables().get(0) ).tableStats("\n<br>\n", df);
 
-        if ( tableStats.length() == 0 )
-            return;
+        List<String> tables = DatabaseDescriptor.getTables();
+        for ( String table:tables )
+        {
+            String tableStats = Table.open( table ).tableStats("\n<br>\n", df);
+
+            if ( tableStats.length() == 0 )
+                return;
 
-        formatter.appendLine("DB statistics:");
-        formatter.appendLine("<br>");
-        formatter.appendLine("<br>");
-
-        formatter.appendLine(tableStats);
-        formatter.appendLine("<br>");
-        formatter.appendLine("<br>");
+            formatter.appendLine("DB statistics: " + table);
+            formatter.appendLine("<br>");
+            formatter.appendLine("<br>");
+
+            formatter.appendLine(tableStats);
+            formatter.appendLine("<br>");
+            formatter.appendLine("<br>");
+        }
     }
 
     private String handlePageDisplay(String queryFormData, String insertFormData, String scriptFormData)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon Jun 22 21:17:36 2009
@@ -205,7 +205,7 @@
     {
         logger.debug("get_slice_from");
         String[] values = RowMutation.getColumnAndColumnFamily(columnParent);
-        if (values.length != 2 || DatabaseDescriptor.getColumnFamilyType(values[0]) != "Standard")
+        if (values.length != 2 || DatabaseDescriptor.getColumnFamilyType(tablename, values[0]) != "Standard")
             throw new InvalidRequestException("get_slice_from requires a standard CF name and a starting column name");
         if (count <= 0)
             throw new InvalidRequestException("get_slice_from requires positive count");
@@ -228,7 +228,7 @@
         {
             throw new InvalidRequestException("get_column requires non-empty columnfamily");
         }
-        if (DatabaseDescriptor.getColumnFamilyType(values[0]).equals("Standard"))
+        if (DatabaseDescriptor.getColumnFamilyType(null, values[0]).equals("Standard"))
         {
             if (values.length != 2)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon Jun 22 21:17:36 2009
@@ -121,7 +121,7 @@
         }
         
         /* Now calculate the resolved row */
-		retRow = new Row(key);		
+		retRow = new Row(table, key);
 		for (int i = 0 ; i < rowList.size(); i++)
 		{
 			retRow.repair(rowList.get(i));			

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jun 22 21:17:36 2009
@@ -596,7 +596,7 @@
         endpoints.remove(StorageService.getLocalStorageEndPoint());
         // TODO: throw a thrift exception if we do not have N nodes
 
-        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        Table table = Table.open(command.table);
         Row row = command.getRow(table);
 
         /*

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java Mon Jun 22 21:17:36 2009
@@ -41,7 +41,7 @@
         random.nextBytes(bytes);
         ColumnFamily cf;
 
-        cf = new ColumnFamily("Standard1", "Standard");
+        cf = ColumnFamily.create("Table1", "Standard1");
         cf.addColumn("C", bytes, 1);
         DataOutputBuffer bufOut = new DataOutputBuffer();
         ColumnFamily.serializer().serialize(cf, bufOut);
@@ -66,7 +66,7 @@
         }
 
         // write
-        cf = new ColumnFamily("Standard1", "Standard");
+        cf = ColumnFamily.create("Table1", "Standard1");
         DataOutputBuffer bufOut = new DataOutputBuffer();
         for (String cName : map.navigableKeySet())
         {
@@ -89,7 +89,7 @@
     @Test
     public void testGetColumnCount()
     {
-        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
         byte val[] = "sample value".getBytes();
 
         cf.addColumn("col1", val, 1);
@@ -103,7 +103,7 @@
     @Test
     public void testTimestamp()
     {
-        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
         byte val1[] = "sample 1".getBytes();
         byte val2[] = "sample 2".getBytes();
         byte val3[] = "sample 3".getBytes();
@@ -118,9 +118,9 @@
     @Test
     public void testMergeAndAdd()
     {
-        ColumnFamily cf_new = new ColumnFamily("Standard1", "Standard");
-        ColumnFamily cf_old = new ColumnFamily("Standard1", "Standard");
-        ColumnFamily cf_result = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf_new = ColumnFamily.create("Table1", "Standard1");
+        ColumnFamily cf_old = ColumnFamily.create("Table1", "Standard1");
+        ColumnFamily cf_result = ColumnFamily.create("Table1", "Standard1");
         byte val[] = "sample value".getBytes();
         byte val2[] = "x value ".getBytes();
 
@@ -141,7 +141,7 @@
     @Test
     public void testEmptyDigest()
     {
-        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
         assert cf.digest().length == 0;
     }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RangeFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RangeFilterTest.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RangeFilterTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RangeFilterTest.java Mon Jun 22 21:17:36 2009
@@ -29,7 +29,7 @@
     @Test
     public void testRangeFilterOnColumns() throws IOException
     {
-        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
         byte[] val = "test value".getBytes();
         cf.addColumn(new Column("a", val, System.currentTimeMillis()));
         cf.addColumn(new Column("b", val, System.currentTimeMillis()));
@@ -47,7 +47,7 @@
     @Test
     public void testRangeFilterOnColumnsWithCount() throws IOException
     {
-        ColumnFamily cf = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
         byte[] val = "test value".getBytes();
         cf.addColumn(new Column("a", val, System.currentTimeMillis()));
         cf.addColumn(new Column("b", val, System.currentTimeMillis()));
@@ -65,7 +65,7 @@
     @Test
     public void testRangeFilterOnSuperColumns() throws IOException
     {
-        ColumnFamily cf = new ColumnFamily("Super1", "Super");
+        ColumnFamily cf = ColumnFamily.create("Table1", "Super1");
         byte[] val = "test value".getBytes();
         SuperColumn sc = null;
         sc = new SuperColumn("a");

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=787403&r1=787402&r2=787403&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Mon Jun 22 21:17:36 2009
@@ -29,10 +29,10 @@
     @Test
     public void testDiffColumnFamily()
     {
-        ColumnFamily cf1 = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf1 = ColumnFamily.create("Table1", "Standard1");
         cf1.addColumn("one", "onev".getBytes(), 0);
 
-        ColumnFamily cf2 = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf2 = ColumnFamily.create("Table1", "Standard1");
         cf2.delete(0, 0);
 
         ColumnFamily cfDiff = cf1.diff(cf2);
@@ -58,15 +58,15 @@
     public void testRepair()
     {
         Row row1 = new Row();
-        ColumnFamily cf1 = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf1 = ColumnFamily.create("Table1", "Standard1");
         cf1.addColumn("one", "A".getBytes(), 0);
         row1.addColumnFamily(cf1);
 
         Row row2 = new Row();
-        ColumnFamily cf2 = new ColumnFamily("Standard1", "Standard");
+        ColumnFamily cf2 = ColumnFamily.create("Table1", "Standard1");
         cf2.addColumn("one", "B".getBytes(), 1);
         cf2.addColumn("two", "C".getBytes(), 1);
-        ColumnFamily cf3 = new ColumnFamily("Standard2", "Standard");
+        ColumnFamily cf3 = ColumnFamily.create("Table2", "Standard2");
         cf3.addColumn("three", "D".getBytes(), 1);
         row2.addColumnFamily(cf2);
         row2.addColumnFamily(cf3);