You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/02/10 21:27:36 UTC

svn commit: r1069557 - /cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java

Author: eevans
Date: Thu Feb 10 20:27:35 2011
New Revision: 1069557

URL: http://svn.apache.org/viewvc?rev=1069557&view=rev
Log:
paging of large rows in sstable2json

Patch by Pavel Yaskevich; reviewed by eevans for CASSANDRA-2041

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1069557&r1=1069556&r2=1069557&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Thu Feb 10 20:27:35 2011
@@ -25,6 +25,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.ConfigurationException;
@@ -45,7 +51,8 @@ import static org.apache.cassandra.utils
  */
 public class SSTableExport
 {
-    private static int INPUT_FILE_BUFFER_SIZE = 8 * 1024 * 1024;
+    // size of the columns page
+    private static final int PAGE_SIZE = 1000;
 
     private static final String KEY_OPTION = "k";
     private static final String EXCLUDEKEY_OPTION = "x";
@@ -70,101 +77,183 @@ public class SSTableExport
         Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
         options.addOption(optEnumerate);
     }
-    
+
+    /**
+     * Wraps given string into quotes
+     * @param val string to quote
+     * @return quoted string
+     */
     private static String quote(String val)
     {
         return String.format("\"%s\"", val);
     }
-    
+
+    /**
+     * JSON Hash Key serializer
+     * @param val value to set as a key
+     * @return JSON Hash key
+     */
     private static String asKey(String val)
     {
         return String.format("%s: ", quote(val));
     }
-    
-    private static void serializeColumns(PrintStream outs, Collection<IColumn> columns, AbstractType comparator, CFMetaData cfMetaData)
+
+    /**
+     * Serialize columns using given column iterator
+     * @param columns column iterator
+     * @param out output stream
+     * @param comparator columns comparator
+     * @param cfMetaData Column Family metadata (to get validator)
+     */
+    private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
     {
-        outs.print("[");
+        while (columns.hasNext())
+        {
+            serializeColumn(columns.next(), out, comparator, cfMetaData);
 
-        Iterator<IColumn> iter = columns.iterator();
+            if (columns.hasNext())
+                out.print(", ");
+        }
+    }
 
-        while (iter.hasNext())
-        {
-            outs.print("[");
-            IColumn column = iter.next();
+    /**
+     * Serialize a collection of the columns
+     * @param columns collection of the columns to serialize
+     * @param out output stream
+     * @param comparator columns comparator
+     * @param cfMetaData Column Family metadata (to get validator)
+     */
+    private static void serializeColumns(Collection<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+    {
+        serializeColumns(columns.iterator(), out, comparator, cfMetaData);
+    }
 
-            ByteBuffer name = column.name();
-            AbstractType validator = cfMetaData.getValueValidator(name);
+    /**
+     * Serialize a given column to the JSON format
+     * @param column column presentation
+     * @param out output stream
+     * @param comparator columns comparator
+     * @param cfMetaData Column Family metadata (to get validator)
+     */
+    private static void serializeColumn(IColumn column, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+    {
+        ByteBuffer name = ByteBufferUtil.clone(column.name());
+        ByteBuffer value = ByteBufferUtil.clone(column.value());
+        AbstractType validator = cfMetaData.getValueValidator(name);
+
+        out.print("[");
+        out.print(quote(comparator.getString(name)));
+        out.print(", ");
+        out.print(quote(validator.getString(value)));
+        out.print(", ");
+        out.print(column.timestamp());
+        out.print(", ");
+        out.print(column.isMarkedForDelete());
+
+        if (column instanceof ExpiringColumn)
+        {
+            out.print(", ");
+            out.print(((ExpiringColumn) column).getTimeToLive());
+            out.print(", ");
+            out.print(column.getLocalDeletionTime());
+        }
+
+        out.print("]");
+    }
+
+    /**
+     * Get portion of the columns and serialize in loop while not more columns left in the row
+     * @param reader SSTableReader for given SSTable
+     * @param row SSTableIdentityIterator row representation with Column Family
+     * @param key Decorated Key for the required row
+     * @param out output stream
+     */
+    private static void serializeRow(SSTableReader reader, SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
+    {
+        ColumnFamily columnFamily = row.getColumnFamily();
+        boolean isSuperCF = columnFamily.isSuper();
+        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; // initial column name, "blank" for first
+
+        out.print(asKey(bytesToHex(key.key)));
+
+        out.print(isSuperCF ? "{" : "[");
 
-            outs.print(quote(comparator.getString(name)));
-            outs.print(", ");
-            outs.print(quote(validator.getString(column.value())));
-            outs.print(", ");
-            outs.print(column.timestamp());
-            outs.print(", ");
-            outs.print(column.isMarkedForDelete());
+        while (true)
+        {
+            QueryFilter filter = QueryFilter.getSliceFilter(key,
+                                                            new QueryPath(columnFamily.metadata().tableName),
+                                                            startColumn,
+                                                            ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                            false,
+                                                            PAGE_SIZE);
+
+            IColumnIterator columns = filter.getSSTableColumnIterator(reader);
 
-            if (column instanceof ExpiringColumn)
+            int columnCount = 0;
+            while (columns.hasNext())
             {
-                outs.print(", ");
-                outs.print(((ExpiringColumn) column).getTimeToLive());
-                outs.print(", ");
-                outs.print(column.getLocalDeletionTime());
+                // setting new start column to the last of the current columns
+                startColumn = columns.next().name();
+                columnCount++;
             }
 
-            outs.print("]");
-
-            if (iter.hasNext())
+            try
+            {
+                columns = filter.getSSTableColumnIterator(reader); // iterator reset
+                serializeRow(columns, isSuperCF, out);
+            }
+            catch (IOException e)
             {
-                outs.print(", ");
+                System.err.println("WARNING: Corrupt row " + key + " (skipping).");
             }
+
+            if (columnCount < PAGE_SIZE)
+                break;
         }
-        
-        outs.print("]");
+
+        out.print(isSuperCF ? "}" : "]");
     }
-    
-    private static void serializeRow(PrintStream outs, SSTableIdentityIterator row) throws IOException
+
+    /**
+     * Serialize a row with already given column iterator
+     *
+     * @param columns columns of the row
+     * @param isSuper true if wrapping Column Family is Super
+     * @param out output stream
+     *
+     * @throws IOException on any I/O error.
+     */
+    private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException
     {
-        ColumnFamily columnFamily = row.getColumnFamilyWithColumns();
+        ColumnFamily columnFamily = columns.getColumnFamily();
         CFMetaData cfMetaData = columnFamily.metadata();
 
         AbstractType comparator = columnFamily.getComparator();
 
-        // key is represented as String according to current Partitioner
-        outs.print("  " + asKey(bytesToHex(row.getKey().key)));
-
-        if (columnFamily.isSuper())
+        if (isSuper)
         {
-            outs.print("{ ");
-
-            Iterator<IColumn> iter = columnFamily.getSortedColumns().iterator();
-            while (iter.hasNext())
+            while (columns.hasNext())
             {
-                IColumn column = iter.next();
+                IColumn column = columns.next();
 
-                // header of the row
-                outs.print(asKey(comparator.getString(column.name())));
-                outs.print("{");
-                outs.print(asKey("deletedAt"));
-                outs.print(column.getMarkedForDeleteAt());
-                outs.print(", ");
-                outs.print(asKey("subColumns"));
-
-                // columns
-                serializeColumns(outs, column.getSubColumns(), columnFamily.getSubComparator(), cfMetaData);
-
-                outs.print("}");
-
-                if (iter.hasNext())
-                {
-                    outs.print(", ");
-                }
+                out.print(asKey(comparator.getString(column.name())));
+                out.print("{");
+                out.print(asKey("deletedAt"));
+                out.print(column.getMarkedForDeleteAt());
+                out.print(", ");
+                out.print(asKey("subColumns"));
+                out.print("[");
+                serializeColumns(column.getSubColumns(), out, columnFamily.getSubComparator(), cfMetaData);
+                out.print("]");
+                out.print("}");
+
+                if (columns.hasNext())
+                    out.print(", ");
             }
-            
-            outs.print("}");
         }
         else
         {
-            serializeColumns(outs, columnFamily.getSortedColumns(), comparator, cfMetaData);
+            serializeColumns(columns, out, comparator, cfMetaData);
         }
     }
 
@@ -201,118 +290,96 @@ public class SSTableExport
      * 
      * @param ssTableFile the SSTable to export the rows from
      * @param outs PrintStream to write the output to
-     * @param keys the keys corresponding to the rows to export
+     * @param toExport the keys corresponding to the rows to export
      * @param excludes the keys to exclude from export
      *
      * @throws IOException on failure to read/write input/output
      */
-    public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes) throws IOException
+    public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
     {
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
-        SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
-        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();    
-        Set<String> excludeSet = new HashSet<String>();
-        int i = 0;
+        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+        IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+        for (String toExclude : excludes)
+        {
+            toExport.remove(toExclude); // excluding key from export
+        }
 
-        if (excludes != null)
-            excludeSet = new HashSet<String>(Arrays.asList(excludes));
-        
         outs.println("{");
 
-        // last key to compare order 
+        int i = 0;
+
+        // last key to compare order
         DecoratedKey lastKey = null;
-        
-        for (String key : keys)
+
+        for (String key : toExport)
         {
-            if (excludeSet.contains(key))
-                continue;
+            DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
 
-            DecoratedKey<?> dk = partitioner.decorateKey(hexToBytes(key));
+            if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
+                throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
 
-            // validate order of the keys in the sstable
-            if (lastKey != null && lastKey.compareTo(dk) > 0 )
-                throw new IOException("Key out of order! " + lastKey + " > " + dk);
+            lastKey = decoratedKey;
 
-            lastKey = dk;
+            scanner.seekTo(decoratedKey);
 
-            scanner.seekTo(dk);
-            
-            i++;
+            if (!scanner.hasNext())
+                continue;
 
-            if (scanner.hasNext())
-            {
-                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+            serializeRow(reader, (SSTableIdentityIterator) scanner.next(), decoratedKey, outs);
 
-                try
-                {
-                    serializeRow(outs, row);
-                }
-                catch (IOException ioexc)
-                {
-                    System.err.println("WARNING: Corrupt row " + key + " (skipping).");
-                    continue;
-                }
-                catch (OutOfMemoryError oom)
-                {
-                    System.err.println("ERROR: Out of memory deserializing row " + key);
-                    continue;
-                }
+            if (i != 0)
+                outs.println(",");
 
-                if (i != 1)
-                    outs.println(",");
-            }
+            i++;
         }
-        
+
         outs.println("\n}");
         outs.flush();
+
+        scanner.close();
     }
 
     // This is necessary to accommodate the test suite since you cannot open a Reader more
     // than once from within the same process.
     static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
     {
-        SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
         Set<String> excludeSet = new HashSet<String>();
 
         if (excludes != null)
             excludeSet = new HashSet<String>(Arrays.asList(excludes));
 
-        outs.println("{");
 
         SSTableIdentityIterator row;
+        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+        outs.println("{");
+
+        int i = 0;
 
-        boolean elementWritten = false;
+        // collecting keys to export
         while (scanner.hasNext())
         {
             row = (SSTableIdentityIterator) scanner.next();
 
-            if (excludeSet.contains(bytesToHex(row.getKey().key)))
+            String currentKey = bytesToHex(row.getKey().key);
+
+            if (excludeSet.contains(currentKey))
                 continue;
-            else if (elementWritten)
+            else if (i != 0)
                 outs.println(",");
 
-            try
-            {
-                serializeRow(outs, row);
+            serializeRow(reader, row, row.getKey(), outs);
 
-                // used to decide should we put ',' after previous row or not
-                if (!elementWritten)
-                    elementWritten = true;
-            }
-            catch (IOException ioexcep)
-            {
-                System.err.println("WARNING: Corrupt row " + bytesToHex(row.getKey().key) + " (skipping).");
-                elementWritten = false;
-            }
-            catch (OutOfMemoryError oom)
-            {
-                System.err.println("ERROR: Out of memory deserializing row " + bytesToHex(row.getKey().key));
-                elementWritten = false;
-            }
+            i++;
         }
-        
-        outs.printf("%n}%n");
+
+        outs.println("\n}");
         outs.flush();
+
+        scanner.close();
     }
     
     /**
@@ -326,8 +393,7 @@ public class SSTableExport
      */
     public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
     {
-        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
-        export(reader, outs, excludes);
+        export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
     }
 
     /**
@@ -395,7 +461,7 @@ public class SSTableExport
         else
         {
             if ((keys != null) && (keys.length > 0))
-                export(ssTableFileName, System.out, keys, excludes);
+                export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
             else
                 export(ssTableFileName, excludes);
         }