You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/02/10 21:27:36 UTC
svn commit: r1069557 -
/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
Author: eevans
Date: Thu Feb 10 20:27:35 2011
New Revision: 1069557
URL: http://svn.apache.org/viewvc?rev=1069557&view=rev
Log:
paging of large rows in sstable2json
Patch by Pavel Yaskevich; reviewed by eevans for CASSANDRA-2041
Modified:
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1069557&r1=1069556&r2=1069557&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Thu Feb 10 20:27:35 2011
@@ -25,6 +25,12 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.cli.*;
import org.apache.cassandra.config.ConfigurationException;
@@ -45,7 +51,8 @@ import static org.apache.cassandra.utils
*/
public class SSTableExport
{
- private static int INPUT_FILE_BUFFER_SIZE = 8 * 1024 * 1024;
+ // size of the columns page
+ private static final int PAGE_SIZE = 1000;
private static final String KEY_OPTION = "k";
private static final String EXCLUDEKEY_OPTION = "x";
@@ -70,101 +77,183 @@ public class SSTableExport
Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
options.addOption(optEnumerate);
}
-
+
+ /**
+ * Wraps given string into quotes
+ * @param val string to quote
+ * @return quoted string
+ */
private static String quote(String val)
{
return String.format("\"%s\"", val);
}
-
+
+ /**
+ * JSON Hash Key serializer
+ * @param val value to set as a key
+ * @return JSON Hash key
+ */
private static String asKey(String val)
{
return String.format("%s: ", quote(val));
}
-
- private static void serializeColumns(PrintStream outs, Collection<IColumn> columns, AbstractType comparator, CFMetaData cfMetaData)
+
+ /**
+ * Serialize columns using given column iterator
+ * @param columns column iterator
+ * @param out output stream
+ * @param comparator columns comparator
+ * @param cfMetaData Column Family metadata (to get validator)
+ */
+ private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
{
- outs.print("[");
+ while (columns.hasNext())
+ {
+ serializeColumn(columns.next(), out, comparator, cfMetaData);
- Iterator<IColumn> iter = columns.iterator();
+ if (columns.hasNext())
+ out.print(", ");
+ }
+ }
- while (iter.hasNext())
- {
- outs.print("[");
- IColumn column = iter.next();
+ /**
+ * Serialize a collection of the columns
+ * @param columns collection of the columns to serialize
+ * @param out output stream
+ * @param comparator columns comparator
+ * @param cfMetaData Column Family metadata (to get validator)
+ */
+ private static void serializeColumns(Collection<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+ {
+ serializeColumns(columns.iterator(), out, comparator, cfMetaData);
+ }
- ByteBuffer name = column.name();
- AbstractType validator = cfMetaData.getValueValidator(name);
+ /**
+ * Serialize a given column to the JSON format
+ * @param column column presentation
+ * @param out output stream
+ * @param comparator columns comparator
+ * @param cfMetaData Column Family metadata (to get validator)
+ */
+ private static void serializeColumn(IColumn column, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+ {
+ ByteBuffer name = ByteBufferUtil.clone(column.name());
+ ByteBuffer value = ByteBufferUtil.clone(column.value());
+ AbstractType validator = cfMetaData.getValueValidator(name);
+
+ out.print("[");
+ out.print(quote(comparator.getString(name)));
+ out.print(", ");
+ out.print(quote(validator.getString(value)));
+ out.print(", ");
+ out.print(column.timestamp());
+ out.print(", ");
+ out.print(column.isMarkedForDelete());
+
+ if (column instanceof ExpiringColumn)
+ {
+ out.print(", ");
+ out.print(((ExpiringColumn) column).getTimeToLive());
+ out.print(", ");
+ out.print(column.getLocalDeletionTime());
+ }
+
+ out.print("]");
+ }
+
+ /**
+ * Get portion of the columns and serialize in loop while not more columns left in the row
+ * @param reader SSTableReader for given SSTable
+ * @param row SSTableIdentityIterator row representation with Column Family
+ * @param key Decorated Key for the required row
+ * @param out output stream
+ */
+ private static void serializeRow(SSTableReader reader, SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
+ {
+ ColumnFamily columnFamily = row.getColumnFamily();
+ boolean isSuperCF = columnFamily.isSuper();
+ ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; // initial column name, "blank" for first
+
+ out.print(asKey(bytesToHex(key.key)));
+
+ out.print(isSuperCF ? "{" : "[");
- outs.print(quote(comparator.getString(name)));
- outs.print(", ");
- outs.print(quote(validator.getString(column.value())));
- outs.print(", ");
- outs.print(column.timestamp());
- outs.print(", ");
- outs.print(column.isMarkedForDelete());
+ while (true)
+ {
+ QueryFilter filter = QueryFilter.getSliceFilter(key,
+ new QueryPath(columnFamily.metadata().tableName),
+ startColumn,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ PAGE_SIZE);
+
+ IColumnIterator columns = filter.getSSTableColumnIterator(reader);
- if (column instanceof ExpiringColumn)
+ int columnCount = 0;
+ while (columns.hasNext())
{
- outs.print(", ");
- outs.print(((ExpiringColumn) column).getTimeToLive());
- outs.print(", ");
- outs.print(column.getLocalDeletionTime());
+ // setting new start column to the last of the current columns
+ startColumn = columns.next().name();
+ columnCount++;
}
- outs.print("]");
-
- if (iter.hasNext())
+ try
+ {
+ columns = filter.getSSTableColumnIterator(reader); // iterator reset
+ serializeRow(columns, isSuperCF, out);
+ }
+ catch (IOException e)
{
- outs.print(", ");
+ System.err.println("WARNING: Corrupt row " + key + " (skipping).");
}
+
+ if (columnCount < PAGE_SIZE)
+ break;
}
-
- outs.print("]");
+
+ out.print(isSuperCF ? "}" : "]");
}
-
- private static void serializeRow(PrintStream outs, SSTableIdentityIterator row) throws IOException
+
+ /**
+ * Serialize a row with already given column iterator
+ *
+ * @param columns columns of the row
+ * @param isSuper true if wrapping Column Family is Super
+ * @param out output stream
+ *
+ * @throws IOException on any I/O error.
+ */
+ private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException
{
- ColumnFamily columnFamily = row.getColumnFamilyWithColumns();
+ ColumnFamily columnFamily = columns.getColumnFamily();
CFMetaData cfMetaData = columnFamily.metadata();
AbstractType comparator = columnFamily.getComparator();
- // key is represented as String according to current Partitioner
- outs.print(" " + asKey(bytesToHex(row.getKey().key)));
-
- if (columnFamily.isSuper())
+ if (isSuper)
{
- outs.print("{ ");
-
- Iterator<IColumn> iter = columnFamily.getSortedColumns().iterator();
- while (iter.hasNext())
+ while (columns.hasNext())
{
- IColumn column = iter.next();
+ IColumn column = columns.next();
- // header of the row
- outs.print(asKey(comparator.getString(column.name())));
- outs.print("{");
- outs.print(asKey("deletedAt"));
- outs.print(column.getMarkedForDeleteAt());
- outs.print(", ");
- outs.print(asKey("subColumns"));
-
- // columns
- serializeColumns(outs, column.getSubColumns(), columnFamily.getSubComparator(), cfMetaData);
-
- outs.print("}");
-
- if (iter.hasNext())
- {
- outs.print(", ");
- }
+ out.print(asKey(comparator.getString(column.name())));
+ out.print("{");
+ out.print(asKey("deletedAt"));
+ out.print(column.getMarkedForDeleteAt());
+ out.print(", ");
+ out.print(asKey("subColumns"));
+ out.print("[");
+ serializeColumns(column.getSubColumns(), out, columnFamily.getSubComparator(), cfMetaData);
+ out.print("]");
+ out.print("}");
+
+ if (columns.hasNext())
+ out.print(", ");
}
-
- outs.print("}");
}
else
{
- serializeColumns(outs, columnFamily.getSortedColumns(), comparator, cfMetaData);
+ serializeColumns(columns, out, comparator, cfMetaData);
}
}
@@ -201,118 +290,96 @@ public class SSTableExport
*
* @param ssTableFile the SSTable to export the rows from
* @param outs PrintStream to write the output to
- * @param keys the keys corresponding to the rows to export
+ * @param toExport the keys corresponding to the rows to export
* @param excludes the keys to exclude from export
*
* @throws IOException on failure to read/write input/output
*/
- public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes) throws IOException
+ public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
{
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- Set<String> excludeSet = new HashSet<String>();
- int i = 0;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+ for (String toExclude : excludes)
+ {
+ toExport.remove(toExclude); // excluding key from export
+ }
- if (excludes != null)
- excludeSet = new HashSet<String>(Arrays.asList(excludes));
-
outs.println("{");
- // last key to compare order
+ int i = 0;
+
+ // last key to compare order
DecoratedKey lastKey = null;
-
- for (String key : keys)
+
+ for (String key : toExport)
{
- if (excludeSet.contains(key))
- continue;
+ DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
- DecoratedKey<?> dk = partitioner.decorateKey(hexToBytes(key));
+ if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
+ throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
- // validate order of the keys in the sstable
- if (lastKey != null && lastKey.compareTo(dk) > 0 )
- throw new IOException("Key out of order! " + lastKey + " > " + dk);
+ lastKey = decoratedKey;
- lastKey = dk;
+ scanner.seekTo(decoratedKey);
- scanner.seekTo(dk);
-
- i++;
+ if (!scanner.hasNext())
+ continue;
- if (scanner.hasNext())
- {
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ serializeRow(reader, (SSTableIdentityIterator) scanner.next(), decoratedKey, outs);
- try
- {
- serializeRow(outs, row);
- }
- catch (IOException ioexc)
- {
- System.err.println("WARNING: Corrupt row " + key + " (skipping).");
- continue;
- }
- catch (OutOfMemoryError oom)
- {
- System.err.println("ERROR: Out of memory deserializing row " + key);
- continue;
- }
+ if (i != 0)
+ outs.println(",");
- if (i != 1)
- outs.println(",");
- }
+ i++;
}
-
+
outs.println("\n}");
outs.flush();
+
+ scanner.close();
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
{
- SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
Set<String> excludeSet = new HashSet<String>();
if (excludes != null)
excludeSet = new HashSet<String>(Arrays.asList(excludes));
- outs.println("{");
SSTableIdentityIterator row;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ outs.println("{");
+
+ int i = 0;
- boolean elementWritten = false;
+ // collecting keys to export
while (scanner.hasNext())
{
row = (SSTableIdentityIterator) scanner.next();
- if (excludeSet.contains(bytesToHex(row.getKey().key)))
+ String currentKey = bytesToHex(row.getKey().key);
+
+ if (excludeSet.contains(currentKey))
continue;
- else if (elementWritten)
+ else if (i != 0)
outs.println(",");
- try
- {
- serializeRow(outs, row);
+ serializeRow(reader, row, row.getKey(), outs);
- // used to decide should we put ',' after previous row or not
- if (!elementWritten)
- elementWritten = true;
- }
- catch (IOException ioexcep)
- {
- System.err.println("WARNING: Corrupt row " + bytesToHex(row.getKey().key) + " (skipping).");
- elementWritten = false;
- }
- catch (OutOfMemoryError oom)
- {
- System.err.println("ERROR: Out of memory deserializing row " + bytesToHex(row.getKey().key));
- elementWritten = false;
- }
+ i++;
}
-
- outs.printf("%n}%n");
+
+ outs.println("\n}");
outs.flush();
+
+ scanner.close();
}
/**
@@ -326,8 +393,7 @@ public class SSTableExport
*/
public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
{
- SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- export(reader, outs, excludes);
+ export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
}
/**
@@ -395,7 +461,7 @@ public class SSTableExport
else
{
if ((keys != null) && (keys.length > 0))
- export(ssTableFileName, System.out, keys, excludes);
+ export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
else
export(ssTableFileName, excludes);
}