You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/02/08 21:36:35 UTC
svn commit: r1068562 -
/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
Author: eevans
Date: Tue Feb 8 20:36:35 2011
New Revision: 1068562
URL: http://svn.apache.org/viewvc?rev=1068562&view=rev
Log:
paging of large rows in sstable2json
Patch by Pavel Yaskevich; reviewed by eevans for CASSANDRA-2041
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1068562&r1=1068561&r2=1068562&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java Tue Feb 8 20:36:35 2011
@@ -24,15 +24,17 @@ import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.cli.*;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.ExpiringColumn;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.*;
@@ -44,7 +46,8 @@ import static org.apache.cassandra.utils
*/
public class SSTableExport
{
- private static int INPUT_FILE_BUFFER_SIZE = 8 * 1024 * 1024;
+ // size of the columns page
+ private static final int PAGE_SIZE = 1000;
private static final String KEY_OPTION = "k";
private static final String EXCLUDEKEY_OPTION = "x";
@@ -69,79 +72,168 @@ public class SSTableExport
Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
options.addOption(optEnumerate);
}
-
+
+ /**
+ * Wraps given string into quotes
+ * @param val string to quote
+ * @return quoted string
+ */
private static String quote(String val)
{
return String.format("\"%s\"", val);
}
-
+
+ /**
+ * JSON Hash Key serializer
+ * @param val value to set as a key
+ * @return JSON Hash key
+ */
private static String asKey(String val)
{
return String.format("%s: ", quote(val));
}
-
- private static void serializeColumns(PrintStream outs, Collection<IColumn> cols, AbstractType comp)
+
+ /**
+ * Serialize columns using given column iterator
+ * @param columns column iterator
+ * @param out output stream
+ */
+ private static void serializeColumns(Iterator<IColumn> columns, PrintStream out)
{
- outs.print("[");
+ while (columns.hasNext())
+ {
+ serializeColumn(columns.next(), out);
- Iterator<IColumn> iter = cols.iterator();
- while (iter.hasNext())
+ if (columns.hasNext())
+ out.print(", ");
+ }
+ }
+
+ /**
+ * Serialize a collection of the columns
+ * @param columns collection of the columns to serialize
+ * @param out output stream
+ */
+ private static void serializeColumns(Collection<IColumn> columns, PrintStream out)
+ {
+ serializeColumns(columns.iterator(), out);
+ }
+
+ /**
+ * Serialize a given column to the JSON format
+ * @param column column presentation
+ * @param out output stream
+ */
+ private static void serializeColumn(IColumn column, PrintStream out)
+ {
+ out.print("[");
+ out.print(quote(bytesToHex(column.name())));
+ out.print(", ");
+ out.print(quote(bytesToHex(column.value())));
+ out.print(", ");
+ out.print(column.timestamp());
+ out.print(", ");
+ out.print(column.isMarkedForDelete());
+
+ if (column instanceof ExpiringColumn)
+ {
+ out.print(", ");
+ out.print(((ExpiringColumn) column).getTimeToLive());
+ out.print(", ");
+ out.print(column.getLocalDeletionTime());
+ }
+
+ out.print("]");
+ }
+
+ /**
+ * Get portion of the columns and serialize in loop while not more columns left in the row
+ * @param reader SSTableReader for given SSTable
+ * @param row SSTableIdentityIterator row representation with Column Family
+ * @param key Decorated Key for the required row
+ * @param out output stream
+ */
+ private static void serializeRow(SSTableReader reader, SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
+ {
+ ColumnFamily columnFamily = row.getColumnFamily();
+ boolean isSuperCF = columnFamily.isSuper();
+ ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; // initial column name, "blank" for first
+
+ out.print(asKey(bytesToHex(key.key)));
+
+ out.print(isSuperCF ? "{" : "[");
+
+ while (true)
{
- outs.print("[");
- IColumn column = iter.next();
- outs.print(quote(bytesToHex(column.name())));
- outs.print(", ");
- outs.print(quote(bytesToHex(column.value())));
- outs.print(", ");
- outs.print(column.timestamp());
- outs.print(", ");
- outs.print(column.isMarkedForDelete());
- if (column instanceof ExpiringColumn)
+ QueryFilter filter = QueryFilter.getSliceFilter(key,
+ new QueryPath(columnFamily.metadata().tableName),
+ startColumn,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ PAGE_SIZE);
+
+ IColumnIterator columns = filter.getSSTableColumnIterator(reader);
+
+ int columnCount = 0;
+ while (columns.hasNext())
{
- outs.print(", ");
- outs.print(((ExpiringColumn) column).getTimeToLive());
- outs.print(", ");
- outs.print(column.getLocalDeletionTime());
+ // setting new start column to the last of the current columns
+ startColumn = columns.next().name();
+ columnCount++;
}
- outs.print("]");
- if (iter.hasNext())
- outs.print(", ");
+
+ try
+ {
+ columns = filter.getSSTableColumnIterator(reader); // iterator reset
+ serializeRow(columns, isSuperCF, out);
+ }
+ catch (IOException e)
+ {
+ System.err.println("WARNING: Corrupt row " + key + " (skipping).");
+ }
+
+ if (columnCount < PAGE_SIZE)
+ break;
}
-
- outs.print("]");
+
+ out.print(isSuperCF ? "}" : "]");
}
-
- private static void serializeRow(PrintStream outs, SSTableIdentityIterator row) throws IOException
- {
- ColumnFamily cf = row.getColumnFamilyWithColumns();
- AbstractType comparator = cf.getComparator();
- outs.print(asKey(bytesToHex(row.getKey().key)));
- if (cf.isSuper())
+ /**
+ * Serialize a row with already given column iterator
+ *
+ * @param columns columns of the row
+ * @param isSuper true if wrapping Column Family is Super
+ * @param out output stream
+ *
+ * @throws IOException on any I/O error.
+ */
+ private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException
+ {
+ if (isSuper)
{
- outs.print("{ ");
-
- Iterator<IColumn> iter = cf.getSortedColumns().iterator();
- while (iter.hasNext())
+ while (columns.hasNext())
{
- IColumn column = iter.next();
- outs.print(asKey(bytesToHex(column.name())));
- outs.print("{");
- outs.print(asKey("deletedAt"));
- outs.print(column.getMarkedForDeleteAt());
- outs.print(", ");
- outs.print(asKey("subColumns"));
- serializeColumns(outs, column.getSubColumns(), comparator);
- outs.print("}");
- if (iter.hasNext())
- outs.print(", ");
+ IColumn column = columns.next();
+
+ out.print(asKey(bytesToHex(column.name())));
+ out.print("{");
+ out.print(asKey("deletedAt"));
+ out.print(column.getMarkedForDeleteAt());
+ out.print(", ");
+ out.print(asKey("subColumns"));
+ out.print("[");
+ serializeColumns(column.getSubColumns(), out);
+ out.print("]");
+ out.print("}");
+
+ if (columns.hasNext())
+ out.print(", ");
}
-
- outs.print("}");
}
else
{
- serializeColumns(outs, cf.getSortedColumns(), comparator);
+ serializeColumns(columns, out);
}
}
@@ -176,117 +268,97 @@ public class SSTableExport
/**
* Export specific rows from an SSTable and write the resulting JSON to a PrintStream.
*
- * @param ssTableFile the SSTable to export the rows from
+ * @param ssTableFile the SSTableScanner to export the rows from
* @param outs PrintStream to write the output to
- * @param keys the keys corresponding to the rows to export
+ * @param toExport the keys corresponding to the rows to export
+ * @param excludes keys to exclude from export
* @throws IOException on failure to read/write input/output
*/
- public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes)
- throws IOException
+ public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
{
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- Set<String> excludeSet = new HashSet<String>();
- int i = 0;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+ for (String toExclude : excludes)
+ {
+ toExport.remove(toExclude); // excluding key from export
+ }
- if (excludes != null)
- excludeSet = new HashSet<String>(Arrays.asList(excludes));
-
outs.println("{");
- // last key to compare order
+ int i = 0;
+
+ // last key to compare order
DecoratedKey lastKey = null;
-
- for (String key : keys)
+
+ for (String key : toExport)
{
- if (excludeSet.contains(key))
- continue;
- DecoratedKey<?> dk = partitioner.decorateKey(hexToBytes(key));
+ DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
- // validate order of the keys in the sstable
- if (lastKey != null && lastKey.compareTo(dk) > 0 )
- throw new IOException("Key out of order! " + lastKey + " > " + dk);
- lastKey = dk;
+ if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
+ throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
- scanner.seekTo(dk);
-
- i++;
+ lastKey = decoratedKey;
- if (scanner.hasNext())
- {
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ scanner.seekTo(decoratedKey);
- try
- {
- serializeRow(outs, row);
- }
- catch (IOException ioexc)
- {
- System.err.println("WARNING: Corrupt row " + key + " (skipping).");
- continue;
- }
- catch (OutOfMemoryError oom)
- {
- System.err.println("ERROR: Out of memory deserializing row " + key);
- continue;
- }
+ if (!scanner.hasNext())
+ continue;
- if (i != 1)
- outs.println(",");
- }
+ serializeRow(reader, (SSTableIdentityIterator) scanner.next(), decoratedKey, outs);
+
+ if (i != 0)
+ outs.println(",");
+
+ i++;
}
-
+
outs.println("\n}");
outs.flush();
+
+ scanner.close();
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
{
- SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
Set<String> excludeSet = new HashSet<String>();
if (excludes != null)
excludeSet = new HashSet<String>(Arrays.asList(excludes));
- outs.println("{");
SSTableIdentityIterator row;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
- boolean elementWritten = false;
+ outs.println("{");
+
+ int i = 0;
+
+ // collecting keys to export
while (scanner.hasNext())
{
row = (SSTableIdentityIterator) scanner.next();
- if (excludeSet.contains(bytesToHex(row.getKey().key)))
+ String currentKey = bytesToHex(row.getKey().key);
+
+ if (excludeSet.contains(currentKey))
continue;
- else if (elementWritten)
+ else if (i != 0)
outs.println(",");
- try
- {
- serializeRow(outs, row);
+ serializeRow(reader, row, row.getKey(), outs);
- // used to decide should we put ',' after previous row or not
- if (!elementWritten)
- elementWritten = true;
- }
- catch (IOException ioexcep)
- {
- System.err.println("WARNING: Corrupt row " + bytesToHex(row.getKey().key) + " (skipping).");
- elementWritten = false;
- }
- catch (OutOfMemoryError oom)
- {
- System.err.println("ERROR: Out of memory deserializing row " + bytesToHex(row.getKey().key));
- elementWritten = false;
- }
+ i++;
}
-
- outs.printf("%n}%n");
+
+ outs.println("\n}");
outs.flush();
+
+ scanner.close();
}
/**
@@ -294,18 +366,21 @@ public class SSTableExport
*
* @param ssTableFile the SSTable to export
* @param outs PrintStream to write the output to
+ * @param excludes keys to exclude from export
+ *
* @throws IOException on failure to read/write input/output
*/
public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
{
- SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- export(reader, outs, excludes);
+ export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
}
/**
* Export an SSTable and write the resulting JSON to standard out.
*
* @param ssTableFile SSTable to export
+ * @param excludes keys to exclude from export
+ *
* @throws IOException on failure to read/write SSTable/standard out
*/
public static void export(String ssTableFile, String[] excludes) throws IOException
@@ -318,7 +393,9 @@ public class SSTableExport
* export the contents of the SSTable to JSON.
*
* @param args command lines arguments
+ *
* @throws IOException on failure to open/read/write files or output streams
+ * @throws ConfigurationException on configuration failure (wrong params given)
*/
public static void main(String[] args) throws IOException, ConfigurationException
{
@@ -364,7 +441,7 @@ public class SSTableExport
else
{
if ((keys != null) && (keys.length > 0))
- export(ssTableFileName, System.out, keys, excludes);
+ export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
else
export(ssTableFileName, excludes);
}