You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/17 23:09:58 UTC

svn commit: r816380 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/Column.java db/ColumnFamilySerializer.java db/ColumnFamilyStore.java db/SuperColumn.java io/IteratingRow.java io/SSTableReader.java io/SSTableScanner.java

Author: jbellis
Date: Thu Sep 17 21:09:58 2009
New Revision: 816380

URL: http://svn.apache.org/viewvc?rev=816380&view=rev
Log:
copy FileStruct to SSTableScanner and remove cruft.  Migrate getKeyRange to new scanner class.
patch by jbellis; reviewed by goffinet for CASSANDRA-436

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Sep 17 21:09:58 2009
@@ -36,7 +36,7 @@
 {
     private static ColumnSerializer serializer_ = new ColumnSerializer();
 
-    static ColumnSerializer serializer()
+    public static ColumnSerializer serializer()
     {
         return serializer_;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Sep 17 21:09:58 2009
@@ -27,6 +27,7 @@
 import java.util.Collection;
 
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.db.marshal.AbstractType;
 
 public class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
@@ -81,14 +82,18 @@
     public ColumnFamily deserialize(DataInput dis) throws IOException
     {
         ColumnFamily cf = deserializeFromSSTableNoColumns(dis.readUTF(), dis.readUTF(), readComparator(dis), readComparator(dis), dis);
+        deserializeColumns(dis, cf);
+        return cf;
+    }
+
+    private void deserializeColumns(DataInput dis, ColumnFamily cf) throws IOException
+    {
         int size = dis.readInt();
-        IColumn column;
         for (int i = 0; i < size; ++i)
         {
-            column = cf.getColumnSerializer().deserialize(dis);
+            IColumn column = cf.getColumnSerializer().deserialize(dis);
             cf.addColumn(column);
         }
-        return cf;
     }
 
     private AbstractType readComparator(DataInput dis) throws IOException
@@ -124,4 +129,12 @@
         cf.delete(input.readInt(), input.readLong());
         return cf;
     }
+
+    public ColumnFamily deserializeFromSSTable(SSTableReader sstable, DataInput file) throws IOException
+    {
+        ColumnFamily cf = sstable.makeColumnFamily();
+        deserializeFromSSTableNoColumns(cf, file);
+        deserializeColumns(file, cf);
+        return cf;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Sep 17 21:09:58 2009
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Closeable;
 import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -1340,9 +1341,23 @@
         // sstables
         for (SSTableReader sstable : ssTables_)
         {
-            FileStruct fs = sstable.getFileStruct();
+            final SSTableScanner fs = sstable.getScanner();
             fs.seekTo(startWith);
-            iterators.add(fs);
+            iterators.add(new Iterator<String>()
+            {
+                public boolean hasNext()
+                {
+                    return fs.hasNext();
+                }
+                public String next()
+                {
+                    return fs.next().getKey();
+                }
+                public void remove()
+                {
+                    throw new UnsupportedOperationException();
+                }
+            });
         }
 
         Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
@@ -1392,9 +1407,9 @@
         {
             for (Iterator iter : iterators)
             {
-                if (iter instanceof FileStruct)
+                if (iter instanceof Closeable)
                 {
-                    ((FileStruct)iter).close();
+                    ((Closeable)iter).close();
                 }
             }
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Sep 17 21:09:58 2009
@@ -38,7 +38,7 @@
 {
 	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
 
-    static SuperColumnSerializer serializer(AbstractType comparator)
+    public static SuperColumnSerializer serializer(AbstractType comparator)
     {
         return new SuperColumnSerializer(comparator);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Thu Sep 17 21:09:58 2009
@@ -22,31 +22,39 @@
 
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.DataOutputStream;
+import java.io.DataOutput;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
 import com.google.common.collect.AbstractIterator;
 
-public class IteratingRow extends AbstractIterator<IColumn>
+public class IteratingRow extends AbstractIterator<IColumn> implements Comparable<IteratingRow>
 {
     private final String key;
     private final long finishedAt;
     private final ColumnFamily emptyColumnFamily;
     private final BufferedRandomAccessFile file;
+    private SSTableReader sstable;
+    private long dataStart;
 
     public IteratingRow(BufferedRandomAccessFile file, SSTableReader sstable) throws IOException
     {
         this.file = file;
+        this.sstable = sstable;
 
         key = file.readUTF();
-        long dataSize = file.readInt();
-        long dataStart = file.getFilePointer();
+        int dataSize = file.readInt();
+        dataStart = file.getFilePointer();
         finishedAt = dataStart + dataSize;
+        // legacy stuff to support FileStruct:
         IndexHelper.skipBloomFilterAndIndex(file);
         emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(sstable.makeColumnFamily(), file);
-        file.readInt(); // column count. breaking serializer encapsulation is less fugly than adding a wrapper class to allow deserializeEmpty to return both values
+        file.readInt();
     }
 
     public String getKey()
@@ -59,11 +67,33 @@
         return emptyColumnFamily;
     }
 
+    public void echoData(DataOutput out) throws IOException
+    {
+        file.seek(dataStart);
+        while (file.getFilePointer() < finishedAt)
+        {
+            out.write(file.read());
+        }
+    }
+
+    // TODO r/m this and make compaction merge columns iteratively for CASSSANDRA-16
+    public ColumnFamily getColumnFamily() throws IOException
+    {
+        file.seek(dataStart);
+        IndexHelper.skipBloomFilterAndIndex(file);
+        return ColumnFamily.serializer().deserializeFromSSTable(sstable, file);
+    }
+
     public void skipRemaining() throws IOException
     {
         file.seek(finishedAt);
     }
 
+    public long getEndPosition()
+    {
+        return finishedAt;
+    }
+
     protected IColumn computeNext()
     {
         try
@@ -74,11 +104,16 @@
                 return endOfData();
             }
 
-            return emptyColumnFamily.getColumnSerializer().deserialize(file);
+            return sstable.getColumnSerializer().deserialize(file);
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
     }
+
+    public int compareTo(IteratingRow o)
+    {
+        return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key, o.key);
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=816380&r1=816379&r2=816380&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Sep 17 21:09:58 2009
@@ -32,8 +32,7 @@
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -339,6 +338,11 @@
         return new FileStruct(this);
     }
 
+    public SSTableScanner getScanner() throws IOException
+    {
+        return new SSTableScanner(this);
+    }
+
     public String getTableName()
     {
         return parseTableName(path);
@@ -353,6 +357,13 @@
     {
         return ColumnFamily.create(getTableName(), getColumnFamilyName());
     }
+
+    public ICompactSerializer2<IColumn> getColumnSerializer()
+    {
+        return DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName()).equals("Standard")
+               ? Column.serializer()
+               : SuperColumn.serializer(getColumnComparator());
+    }
 }
 
 class FileSSTableMap

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=816380&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java Thu Sep 17 21:09:58 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+import com.google.common.collect.AbstractIterator;
+
+
+public class SSTableScanner implements Iterator<IteratingRow>, Closeable
+{
+    private static Logger logger = Logger.getLogger(SSTableScanner.class);
+
+    private IteratingRow row;
+    private boolean exhausted = false;
+    private BufferedRandomAccessFile file;
+    private SSTableReader sstable;
+    private Iterator<IteratingRow> iterator;
+
+    SSTableScanner(SSTableReader sstable) throws IOException
+    {
+        // TODO this is used for both compactions and key ranges.  the buffer sizes we want
+        // to use for these ops are very different.  here we are leaning towards the key-range
+        // use case since that is more common.  What we really want is to split those
+        // two uses of this class up.
+        this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", 256 * 1024);
+        this.sstable = sstable;
+    }
+
+    public void close() throws IOException
+    {
+        file.close();
+    }
+
+    public void seekTo(String seekKey)
+    {
+        try
+        {
+            long position = sstable.getNearestPosition(seekKey);
+            if (position < 0)
+            {
+                exhausted = true;
+                return;
+            }
+            file.seek(position);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("corrupt sstable", e);
+        }
+    }
+
+    public boolean hasNext()
+    {
+        if (iterator == null)
+            iterator = exhausted ? Arrays.asList(new IteratingRow[0]).iterator() : new KeyScanningIterator();
+        return iterator.hasNext();
+    }
+
+    public IteratingRow next()
+    {
+        if (iterator == null)
+            iterator = exhausted ? Arrays.asList(new IteratingRow[0]).iterator() : new KeyScanningIterator();
+        return iterator.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private class KeyScanningIterator implements Iterator<IteratingRow>
+    {
+        public boolean hasNext()
+        {
+            try
+            {
+                return (row == null && !file.isEOF()) || row.getEndPosition() < file.length();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public IteratingRow next()
+        {
+            try
+            {
+                if (row != null)
+                    row.skipRemaining();
+                assert !file.isEOF();
+                return row = new IteratingRow(file, sstable);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}