You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/02/09 22:19:07 UTC
svn commit: r1069116 -
/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
Author: eevans
Date: Wed Feb 9 21:19:06 2011
New Revision: 1069116
URL: http://svn.apache.org/viewvc?rev=1069116&view=rev
Log:
merge from 0.7 using mine-conflict (r1062896 was clobbered)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1069116&r1=1069115&r2=1069116&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed Feb 9 21:19:06 2011
@@ -29,11 +29,6 @@ import org.apache.commons.cli.*;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.ExpiringColumn;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.*;
@@ -45,7 +40,8 @@ import static org.apache.cassandra.utils
*/
public class SSTableExport
{
- private static int INPUT_FILE_BUFFER_SIZE = 8 * 1024 * 1024;
+ // size of the columns page
+ private static final int PAGE_SIZE = 1000;
private static final String KEY_OPTION = "k";
private static final String EXCLUDEKEY_OPTION = "x";
@@ -70,12 +66,22 @@ public class SSTableExport
Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
options.addOption(optEnumerate);
}
-
+
+ /**
+ * Wraps given string into quotes
+ * @param val string to quote
+ * @return quoted string
+ */
private static String quote(String val)
{
return String.format("\"%s\"", val);
}
-
+
+ /**
+ * JSON Hash Key serializer
+ * @param val value to set as a key
+ * @return JSON Hash key
+ */
private static String asKey(String val)
{
return String.format("%s: ", quote(val));
@@ -83,7 +89,9 @@ public class SSTableExport
private static void serializeColumns(PrintStream outs, Collection<IColumn> columns, AbstractType comparator, CFMetaData cfMetaData)
{
- outs.print("[");
+ while (columns.hasNext())
+ {
+ serializeColumn(columns.next(), out);
Iterator<IColumn> iter = columns.iterator();
@@ -118,11 +126,18 @@ public class SSTableExport
outs.print(", ");
}
}
-
- outs.print("]");
+
+ out.print("]");
}
-
- private static void serializeRow(PrintStream outs, SSTableIdentityIterator row) throws IOException
+
+ /**
+ * Get portion of the columns and serialize in loop while not more columns left in the row
+ * @param reader SSTableReader for given SSTable
+ * @param row SSTableIdentityIterator row representation with Column Family
+ * @param key Decorated Key for the required row
+ * @param out output stream
+ */
+ private static void serializeRow(SSTableReader reader, SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
{
ColumnFamily columnFamily = row.getColumnFamilyWithColumns();
CFMetaData cfMetaData = columnFamily.metadata();
@@ -134,7 +149,12 @@ public class SSTableExport
if (columnFamily.isSuper())
{
- outs.print("{ ");
+ QueryFilter filter = QueryFilter.getSliceFilter(key,
+ new QueryPath(columnFamily.metadata().tableName),
+ startColumn,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ PAGE_SIZE);
Iterator<IColumn> iter = columnFamily.getSortedColumns().iterator();
while (iter.hasNext())
@@ -159,8 +179,55 @@ public class SSTableExport
outs.print(", ");
}
}
-
- outs.print("}");
+
+ try
+ {
+ columns = filter.getSSTableColumnIterator(reader); // iterator reset
+ serializeRow(columns, isSuperCF, out);
+ }
+ catch (IOException e)
+ {
+ System.err.println("WARNING: Corrupt row " + key + " (skipping).");
+ }
+
+ if (columnCount < PAGE_SIZE)
+ break;
+ }
+
+ out.print(isSuperCF ? "}" : "]");
+ }
+
+ /**
+ * Serialize a row with already given column iterator
+ *
+ * @param columns columns of the row
+ * @param isSuper true if wrapping Column Family is Super
+ * @param out output stream
+ *
+ * @throws IOException on any I/O error.
+ */
+ private static void serializeRow(IColumnIterator columns, boolean isSuper, PrintStream out) throws IOException
+ {
+ if (isSuper)
+ {
+ while (columns.hasNext())
+ {
+ IColumn column = columns.next();
+
+ out.print(asKey(bytesToHex(column.name())));
+ out.print("{");
+ out.print(asKey("deletedAt"));
+ out.print(column.getMarkedForDeleteAt());
+ out.print(", ");
+ out.print(asKey("subColumns"));
+ out.print("[");
+ serializeColumns(column.getSubColumns(), out);
+ out.print("]");
+ out.print("}");
+
+ if (columns.hasNext())
+ out.print(", ");
+ }
}
else
{
@@ -199,9 +266,10 @@ public class SSTableExport
/**
* Export specific rows from an SSTable and write the resulting JSON to a PrintStream.
*
- * @param ssTableFile the SSTable to export the rows from
+ * @param ssTableFile the SSTableScanner to export the rows from
* @param outs PrintStream to write the output to
- * @param keys the keys corresponding to the rows to export
+ * @param toExport the keys corresponding to the rows to export
+ * @param excludes keys to exclude from export
* @param excludes the keys to exclude from export
*
* @throws IOException on failure to read/write input/output
@@ -209,20 +277,23 @@ public class SSTableExport
public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes) throws IOException
{
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- Set<String> excludeSet = new HashSet<String>();
- int i = 0;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+ for (String toExclude : excludes)
+ {
+ toExport.remove(toExclude); // excluding key from export
+ }
- if (excludes != null)
- excludeSet = new HashSet<String>(Arrays.asList(excludes));
-
outs.println("{");
- // last key to compare order
+ int i = 0;
+
+ // last key to compare order
DecoratedKey lastKey = null;
-
- for (String key : keys)
+
+ for (String key : toExport)
{
if (excludeSet.contains(key))
continue;
@@ -235,84 +306,65 @@ public class SSTableExport
lastKey = dk;
- scanner.seekTo(dk);
-
- i++;
+ lastKey = decoratedKey;
- if (scanner.hasNext())
- {
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ scanner.seekTo(decoratedKey);
- try
- {
- serializeRow(outs, row);
- }
- catch (IOException ioexc)
- {
- System.err.println("WARNING: Corrupt row " + key + " (skipping).");
- continue;
- }
- catch (OutOfMemoryError oom)
- {
- System.err.println("ERROR: Out of memory deserializing row " + key);
- continue;
- }
+ if (!scanner.hasNext())
+ continue;
- if (i != 1)
- outs.println(",");
- }
+ serializeRow(reader, (SSTableIdentityIterator) scanner.next(), decoratedKey, outs);
+
+ if (i != 0)
+ outs.println(",");
+
+ i++;
}
-
+
outs.println("\n}");
outs.flush();
+
+ scanner.close();
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
{
- SSTableScanner scanner = reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
Set<String> excludeSet = new HashSet<String>();
if (excludes != null)
excludeSet = new HashSet<String>(Arrays.asList(excludes));
- outs.println("{");
SSTableIdentityIterator row;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ outs.println("{");
+
+ int i = 0;
- boolean elementWritten = false;
+ // collecting keys to export
while (scanner.hasNext())
{
row = (SSTableIdentityIterator) scanner.next();
- if (excludeSet.contains(bytesToHex(row.getKey().key)))
+ String currentKey = bytesToHex(row.getKey().key);
+
+ if (excludeSet.contains(currentKey))
continue;
- else if (elementWritten)
+ else if (i != 0)
outs.println(",");
- try
- {
- serializeRow(outs, row);
+ serializeRow(reader, row, row.getKey(), outs);
- // used to decide should we put ',' after previous row or not
- if (!elementWritten)
- elementWritten = true;
- }
- catch (IOException ioexcep)
- {
- System.err.println("WARNING: Corrupt row " + bytesToHex(row.getKey().key) + " (skipping).");
- elementWritten = false;
- }
- catch (OutOfMemoryError oom)
- {
- System.err.println("ERROR: Out of memory deserializing row " + bytesToHex(row.getKey().key));
- elementWritten = false;
- }
+ i++;
}
-
- outs.printf("%n}%n");
+
+ outs.println("\n}");
outs.flush();
+
+ scanner.close();
}
/**
@@ -326,8 +378,7 @@ public class SSTableExport
*/
public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
{
- SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- export(reader, outs, excludes);
+ export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
}
/**
@@ -348,6 +399,7 @@ public class SSTableExport
* export the contents of the SSTable to JSON.
*
* @param args command lines arguments
+ *
* @throws IOException on failure to open/read/write files or output streams
* @throws ConfigurationException if configuration is invalid
*/
@@ -395,7 +447,7 @@ public class SSTableExport
else
{
if ((keys != null) && (keys.length > 0))
- export(ssTableFileName, System.out, keys, excludes);
+ export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
else
export(ssTableFileName, excludes);
}