You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/02/08 21:36:35 UTC

svn commit: r1068562 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java

Author: eevans
Date: Tue Feb  8 20:36:35 2011
New Revision: 1068562

URL: http://svn.apache.org/viewvc?rev=1068562&view=rev
Log:
paging of large rows in sstable2json

Patch by Pavel Yaskevich; reviewed by eevans for CASSANDRA-2041

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1068562&r1=1068561&r2=1068562&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java Tue Feb  8 20:36:35 2011
@@ -24,15 +24,17 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.ExpiringColumn;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.*;
 
@@ -44,7 +46,8 @@ import static org.apache.cassandra.utils
  */
 public class SSTableExport
 {
-    private static int INPUT_FILE_BUFFER_SIZE = 8 * 1024 * 1024;
+    // size of the columns page
+    private static final int PAGE_SIZE = 1000;
 
     private static final String KEY_OPTION = "k";
     private static final String EXCLUDEKEY_OPTION = "x";
@@ -69,79 +72,168 @@ public class SSTableExport
         Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
         options.addOption(optEnumerate);
     }
-    
+
+    /**
+     * Wraps given string into quotes
+     * @param val string to quote
+     * @return quoted string
+     */
     private static String quote(String val)
     {
         return String.format("\"%s\"", val);
     }
-    
+
+    /**
+     * JSON Hash Key serializer
+     * @param val value to set as a key
+     * @return JSON Hash key
+     */
     private static String asKey(String val)
     {
         return String.format("%s: ", quote(val));
     }
-    
-    private static void serializeColumns(PrintStream outs, Collection<IColumn> cols, AbstractType comp)
+
+    /**
+     * Serialize columns using given column iterator
+     * @param columns column iterator
+     * @param out output stream
+     */
+    private static void serializeColumns(Iterator<IColumn> columns, PrintStream out)
     {
-        outs.print("[");
+        while (columns.hasNext())
+        {
+            serializeColumn(columns.next(), out);
 
-        Iterator<IColumn> iter = cols.iterator();
-        while (iter.hasNext())
+            if (columns.hasNext())
+                out.print(", ");
+        }
+    }
+
+    /**
+     * Serialize a collection of the columns
+     * @param columns collection of the columns to serialize
+     * @param out output stream
+     */
+    private static void serializeColumns(Collection<IColumn> columns, PrintStream out)
+    {
+        serializeColumns(columns.iterator(), out);
+    }
+
+    /**
+     * Serialize a given column to the JSON format
+     * @param column column presentation
+     * @param out output stream
+     */
+    private static void serializeColumn(IColumn column, PrintStream out)
+    {
+        out.print("[");
+        out.print(quote(bytesToHex(column.name())));
+        out.print(", ");
+        out.print(quote(bytesToHex(column.value())));
+        out.print(", ");
+        out.print(column.timestamp());
+        out.print(", ");
+        out.print(column.isMarkedForDelete());
+
+        if (column instanceof ExpiringColumn)
+        {
+            out.print(", ");
+            out.print(((ExpiringColumn) column).getTimeToLive());
+            out.print(", ");
+            out.print(column.getLocalDeletionTime());
+        }
+
+        out.print("]");
+    }
+
+    /**
+     * Get portion of the columns and serialize in loop while not more columns left in the row
+     * @param reader SSTableReader for given SSTable
+     * @param row SSTableIdentityIterator row representation with Column Family
+     * @param key Decorated Key for the required row
+     * @param out output stream
+     */
+    private static void serializeRow(SSTableReader reader, SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
+    {
+        ColumnFamily columnFamily = row.getColumnFamily();
+        boolean isSuperCF = columnFamily.isSuper();
+        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; // initial column name, "blank" for first
+
+        out.print(asKey(bytesToHex(key.key)));
+
+        out.print(isSuperCF ? "{" : "[");
+
+        while (true)
         {
-            outs.print("[");
-            IColumn column = iter.next();
-            outs.print(quote(bytesToHex(column.name())));
-            outs.print(", ");
-            outs.print(quote(bytesToHex(column.value())));
-            outs.print(", ");
-            outs.print(column.timestamp());
-            outs.print(", ");
-            outs.print(column.isMarkedForDelete());
-            if (column instanceof ExpiringColumn)
+            QueryFilter filter = QueryFilter.getSliceFilter(key,
+                                                            new QueryPath(columnFamily.metadata().tableName),
+                                                            startColumn,
+                                                            ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                            false,
+                                                            PAGE_SIZE);
+
+            IColumnIterator columns = filter.getSSTableColumnIterator(reader);
+
+            int columnCount = 0;
+            while (columns.hasNext())
             {
-                outs.print(", ");
-                outs.print(((ExpiringColumn) column).getTimeToLive());
-                outs.print(", ");
-                outs.print(column.getLocalDeletionTime());
+                // setting new start column to the last of the current columns
+                startColumn = columns.next().name();
+                columnCount++;
             }
-            outs.print("]");
-            if (iter.hasNext())
-                outs.print(", ");
+
+            try
+            {
+                columns = filter.getSSTableColumnIterator(reader); // iterator reset
+                serializeRow(columns, isSuperCF, out);
+            }
+            catch (IOException e)
+            {
+                System.err.println("WARNING: Corrupt row " + key + " (skipping).");
+            }
+
+            if (columnCount < PAGE_SIZE)
+                break;
         }
-        
-        outs.print("]");
+
+        out.print(isSuperCF ? "}" : "]");
     }
-    
-    private static void serializeRow(PrintStream outs, SSTableIdentityIterator row) throws IOException
-    {
-        ColumnFamily cf = row.getColumnFamilyWithColumns();
-        AbstractType comparator = cf.getComparator();
-        outs.print(asKey(bytesToHex(row.getKey().key)));
 
-        if (cf.isSuper())
+    /**
+     * Serialize a row with already given column iterator
+     *
+     * @param columns columns of the row
+     * @param isSuper true if wrapping Column Family is Super
+     * @param out output stream
+     *
+     * @throws IOException on any I/O error.
+     */
+    private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException
+    {
+        if (isSuper)
         {
-            outs.print("{ ");
-
-            Iterator<IColumn> iter = cf.getSortedColumns().iterator();
-            while (iter.hasNext())
+            while (columns.hasNext())
             {
-                IColumn column = iter.next();
-                outs.print(asKey(bytesToHex(column.name())));
-                outs.print("{");
-                outs.print(asKey("deletedAt"));
-                outs.print(column.getMarkedForDeleteAt());
-                outs.print(", ");
-                outs.print(asKey("subColumns"));
-                serializeColumns(outs, column.getSubColumns(), comparator);
-                outs.print("}");
-                if (iter.hasNext())
-                    outs.print(", ");
+                IColumn column = columns.next();
+
+                out.print(asKey(bytesToHex(column.name())));
+                out.print("{");
+                out.print(asKey("deletedAt"));
+                out.print(column.getMarkedForDeleteAt());
+                out.print(", ");
+                out.print(asKey("subColumns"));
+                out.print("[");
+                serializeColumns(column.getSubColumns(), out);
+                out.print("]");
+                out.print("}");
+
+                if (columns.hasNext())
+                    out.print(", ");
             }
-            
-            outs.print("}");
         }
         else
         {
-            serializeColumns(outs, cf.getSortedColumns(), comparator);
+            serializeColumns(columns, out);
         }
     }
 
@@ -176,117 +268,97 @@ public class SSTableExport
     /**
      * Export specific rows from an SSTable and write the resulting JSON to a PrintStream.
      * 
-     * @param ssTableFile the SSTable to export the rows from
+     * @param ssTableFile the SSTableScanner to export the rows from
      * @param outs PrintStream to write the output to
-     * @param keys the keys corresponding to the rows to export
+     * @param toExport the keys corresponding to the rows to export
+     * @param excludes keys to exclude from export
      * @throws IOException on failure to read/write input/output
      */
-    public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes)
-    throws IOException
+    public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
     {
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
-        SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
-        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();    
-        Set<String> excludeSet = new HashSet<String>();
-        int i = 0;
+        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+        IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+        for (String toExclude : excludes)
+        {
+            toExport.remove(toExclude); // excluding key from export
+        }
 
-        if (excludes != null)
-            excludeSet = new HashSet<String>(Arrays.asList(excludes));
-        
         outs.println("{");
 
-        // last key to compare order 
+        int i = 0;
+
+        // last key to compare order
         DecoratedKey lastKey = null;
-        
-        for (String key : keys)
+
+        for (String key : toExport)
         {
-            if (excludeSet.contains(key))
-                continue;
-            DecoratedKey<?> dk = partitioner.decorateKey(hexToBytes(key));
+            DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
 
-            // validate order of the keys in the sstable
-            if (lastKey != null && lastKey.compareTo(dk) > 0 )
-                throw new IOException("Key out of order! " + lastKey + " > " + dk);
-            lastKey = dk;
+            if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
+                throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
 
-            scanner.seekTo(dk);
-            
-            i++;
+            lastKey = decoratedKey;
 
-            if (scanner.hasNext())
-            {
-                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+            scanner.seekTo(decoratedKey);
 
-                try
-                {
-                    serializeRow(outs, row);
-                }
-                catch (IOException ioexc)
-                {
-                    System.err.println("WARNING: Corrupt row " + key + " (skipping).");
-                    continue;
-                }
-                catch (OutOfMemoryError oom)
-                {
-                    System.err.println("ERROR: Out of memory deserializing row " + key);
-                    continue;
-                }
+            if (!scanner.hasNext())
+                continue;
 
-                if (i != 1)
-                    outs.println(",");
-            }
+            serializeRow(reader, (SSTableIdentityIterator) scanner.next(), decoratedKey, outs);
+
+            if (i != 0)
+                outs.println(",");
+
+            i++;
         }
-        
+
         outs.println("\n}");
         outs.flush();
+
+        scanner.close();
     }
 
     // This is necessary to accommodate the test suite since you cannot open a Reader more
     // than once from within the same process.
     static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
     {
-        SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
         Set<String> excludeSet = new HashSet<String>();
 
         if (excludes != null)
             excludeSet = new HashSet<String>(Arrays.asList(excludes));
 
-        outs.println("{");
 
         SSTableIdentityIterator row;
+        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
 
-        boolean elementWritten = false;
+        outs.println("{");
+
+        int i = 0;
+
+        // collecting keys to export
         while (scanner.hasNext())
         {
             row = (SSTableIdentityIterator) scanner.next();
 
-            if (excludeSet.contains(bytesToHex(row.getKey().key)))
+            String currentKey = bytesToHex(row.getKey().key);
+
+            if (excludeSet.contains(currentKey))
                 continue;
-            else if (elementWritten)
+            else if (i != 0)
                 outs.println(",");
 
-            try
-            {
-                serializeRow(outs, row);
+            serializeRow(reader, row, row.getKey(), outs);
 
-                // used to decide should we put ',' after previous row or not
-                if (!elementWritten)
-                    elementWritten = true;
-            }
-            catch (IOException ioexcep)
-            {
-                System.err.println("WARNING: Corrupt row " + bytesToHex(row.getKey().key) + " (skipping).");
-                elementWritten = false;
-            }
-            catch (OutOfMemoryError oom)
-            {
-                System.err.println("ERROR: Out of memory deserializing row " + bytesToHex(row.getKey().key));
-                elementWritten = false;
-            }
+            i++;
         }
-        
-        outs.printf("%n}%n");
+
+        outs.println("\n}");
         outs.flush();
+
+        scanner.close();
     }
     
     /**
@@ -294,18 +366,21 @@ public class SSTableExport
      * 
      * @param ssTableFile the SSTable to export
      * @param outs PrintStream to write the output to
+     * @param excludes keys to exclude from export
+     *
      * @throws IOException on failure to read/write input/output
      */
     public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
     {
-        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
-        export(reader, outs, excludes);
+        export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
     }
 
     /**
      * Export an SSTable and write the resulting JSON to standard out.
      * 
      * @param ssTableFile SSTable to export
+     * @param excludes keys to exclude from export
+     *
      * @throws IOException on failure to read/write SSTable/standard out
      */
     public static void export(String ssTableFile, String[] excludes) throws IOException
@@ -318,7 +393,9 @@ public class SSTableExport
      * export the contents of the SSTable to JSON.
      *  
      * @param args command lines arguments
+     *
      * @throws IOException on failure to open/read/write files or output streams
+     * @throws ConfigurationException on configuration failure (wrong params given)
      */
     public static void main(String[] args) throws IOException, ConfigurationException
     {
@@ -364,7 +441,7 @@ public class SSTableExport
         else
         {
             if ((keys != null) && (keys.length > 0))
-                export(ssTableFileName, System.out, keys, excludes);
+                export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
             else
                 export(ssTableFileName, excludes);
         }