You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/01 19:15:07 UTC
svn commit: r1087815 - in
/cassandra/trunk/src/java/org/apache/cassandra/tools: SSTableExport.java
SSTableImport.java
Author: jbellis
Date: Fri Apr 1 17:15:06 2011
New Revision: 1087815
URL: http://svn.apache.org/viewvc?rev=1087815&view=rev
Log:
really revert SSTableExport move
Added:
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1087815&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Fri Apr 1 17:15:06 2011
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
+import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
+
+/**
+ * Export SSTables to JSON format.
+ */
+public class SSTableExport
+{
+ // size of the columns page
+ private static final int PAGE_SIZE = 1000;
+
+ private static final String KEY_OPTION = "k";
+ private static final String EXCLUDEKEY_OPTION = "x";
+ private static final String ENUMERATEKEYS_OPTION = "e";
+ private static Options options;
+ private static CommandLine cmd;
+
+ static
+ {
+ options = new Options();
+
+ Option optKey = new Option(KEY_OPTION, true, "Row key");
+ // Number of times -k <key> can be passed on the command line.
+ optKey.setArgs(500);
+ options.addOption(optKey);
+
+ Option excludeKey = new Option(EXCLUDEKEY_OPTION, true, "Excluded row key");
+ // Number of times -x <key> can be passed on the command line.
+ excludeKey.setArgs(500);
+ options.addOption(excludeKey);
+
+ Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
+ options.addOption(optEnumerate);
+ }
+
+ /**
+ * Wraps given string into quotes
+ * @param val string to quote
+ * @return quoted string
+ */
+ private static String quote(String val)
+ {
+ return String.format("\"%s\"", val);
+ }
+
+ /**
+ * JSON Hash Key serializer
+ * @param val value to set as a key
+ * @return JSON Hash key
+ */
+ private static String asKey(String val)
+ {
+ return String.format("%s: ", quote(val));
+ }
+
+ /**
+ * Serialize columns using given column iterator
+ * @param columns column iterator
+ * @param out output stream
+ * @param comparator columns comparator
+ * @param cfMetaData Column Family metadata (to get validator)
+ * @return pair of (number of columns serialized, last column serialized)
+ */
+ private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+ {
+ while (columns.hasNext())
+ {
+ IColumn column = columns.next();
+ serializeColumn(column, out, comparator, cfMetaData);
+
+ if (columns.hasNext())
+ out.print(", ");
+ }
+ }
+
+ /**
+ * Serialize a given column to the JSON format
+ * @param column column presentation
+ * @param out output stream
+ * @param comparator columns comparator
+ * @param cfMetaData Column Family metadata (to get validator)
+ */
+ private static void serializeColumn(IColumn column, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+ {
+ ByteBuffer name = ByteBufferUtil.clone(column.name());
+ ByteBuffer value = ByteBufferUtil.clone(column.value());
+ AbstractType validator = cfMetaData.getValueValidator(name);
+
+ out.print("[");
+ out.print(quote(comparator.getString(name)));
+ out.print(", ");
+ out.print(quote(validator.getString(value)));
+ out.print(", ");
+ out.print(column.timestamp());
+
+ if (column instanceof DeletedColumn)
+ {
+ out.print(", ");
+ out.print("\"d\"");
+ }
+ else if (column instanceof ExpiringColumn)
+ {
+ out.print(", ");
+ out.print("\"e\"");
+ out.print(", ");
+ out.print(((ExpiringColumn) column).getTimeToLive());
+ out.print(", ");
+ out.print(column.getLocalDeletionTime());
+ }
+ else if (column instanceof CounterColumn)
+ {
+ out.print(", ");
+ out.print("\"c\"");
+ out.print(", ");
+ out.print(((CounterColumn) column).timestampOfLastDelete());
+ }
+
+ out.print("]");
+ }
+
+ /**
+ * Get portion of the columns and serialize in loop while not more columns left in the row
+ * @param row SSTableIdentityIterator row representation with Column Family
+ * @param key Decorated Key for the required row
+ * @param out output stream
+ */
+ private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
+ {
+ ColumnFamily columnFamily = row.getColumnFamily();
+ boolean isSuperCF = columnFamily.isSuper();
+ CFMetaData cfMetaData = columnFamily.metadata();
+ AbstractType comparator = columnFamily.getComparator();
+
+ out.print(asKey(bytesToHex(key.key)));
+ out.print(isSuperCF ? "{" : "[");
+
+ if (isSuperCF)
+ {
+ while (row.hasNext())
+ {
+ IColumn column = row.next();
+
+ out.print(asKey(comparator.getString(column.name())));
+ out.print("{");
+ out.print(asKey("deletedAt"));
+ out.print(column.getMarkedForDeleteAt());
+ out.print(", ");
+ out.print(asKey("subColumns"));
+ out.print("[");
+ serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData);
+ out.print("]");
+ out.print("}");
+
+ if (row.hasNext())
+ out.print(", ");
+ }
+ }
+ else
+ {
+ serializeColumns(row, out, comparator, cfMetaData);
+ }
+
+ out.print(isSuperCF ? "}" : "]");
+ }
+
+ /**
+ * Enumerate row keys from an SSTableReader and write the result to a PrintStream.
+ *
+ * @param ssTableFile the file to export the rows from
+ * @param outs PrintStream to write the output to
+ * @throws IOException on failure to read/write input/output
+ */
+ public static void enumeratekeys(String ssTableFile, PrintStream outs)
+ throws IOException
+ {
+ Descriptor desc = Descriptor.fromFilename(ssTableFile);
+ KeyIterator iter = new KeyIterator(desc);
+ DecoratedKey lastKey = null;
+ while (iter.hasNext())
+ {
+ DecoratedKey key = iter.next();
+
+ // validate order of the keys in the sstable
+ if (lastKey != null && lastKey.compareTo(key) > 0 )
+ throw new IOException("Key out of order! " + lastKey + " > " + key);
+ lastKey = key;
+
+ outs.println(bytesToHex(key.key));
+ }
+ iter.close();
+ outs.flush();
+ }
+
+ /**
+ * Export specific rows from an SSTable and write the resulting JSON to a PrintStream.
+ *
+ * @param ssTableFile the SSTableScanner to export the rows from
+ * @param outs PrintStream to write the output to
+ * @param toExport the keys corresponding to the rows to export
+ * @param excludes keys to exclude from export
+ * @throws IOException on failure to read/write input/output
+ */
+ public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
+ {
+ SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+ if (excludes != null)
+ toExport.removeAll(Arrays.asList(excludes));
+
+ outs.println("{");
+
+ int i = 0;
+
+ // last key to compare order
+ DecoratedKey lastKey = null;
+
+ for (String key : toExport)
+ {
+ DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
+
+ if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
+ throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
+
+ lastKey = decoratedKey;
+
+ scanner.seekTo(decoratedKey);
+
+ if (!scanner.hasNext())
+ continue;
+
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (!row.getKey().equals(decoratedKey))
+ continue;
+
+ serializeRow(row, decoratedKey, outs);
+
+ if (i != 0)
+ outs.println(",");
+
+ i++;
+ }
+
+ outs.println("\n}");
+ outs.flush();
+
+ scanner.close();
+ }
+
+ // This is necessary to accommodate the test suite since you cannot open a Reader more
+ // than once from within the same process.
+ static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
+ {
+ Set<String> excludeSet = new HashSet<String>();
+
+ if (excludes != null)
+ excludeSet = new HashSet<String>(Arrays.asList(excludes));
+
+
+ SSTableIdentityIterator row;
+ SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+ outs.println("{");
+
+ int i = 0;
+
+ // collecting keys to export
+ while (scanner.hasNext())
+ {
+ row = (SSTableIdentityIterator) scanner.next();
+
+ String currentKey = bytesToHex(row.getKey().key);
+
+ if (excludeSet.contains(currentKey))
+ continue;
+ else if (i != 0)
+ outs.println(",");
+
+ serializeRow(row, row.getKey(), outs);
+
+ i++;
+ }
+
+ outs.println("\n}");
+ outs.flush();
+
+ scanner.close();
+ }
+
+ /**
+ * Export an SSTable and write the resulting JSON to a PrintStream.
+ *
+ * @param ssTableFile the SSTable to export
+ * @param outs PrintStream to write the output to
+ * @param excludes keys to exclude from export
+ *
+ * @throws IOException on failure to read/write input/output
+ */
+ public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
+ {
+ export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
+ }
+
+ /**
+ * Export an SSTable and write the resulting JSON to standard out.
+ *
+ * @param ssTableFile SSTable to export
+ * @param excludes keys to exclude from export
+ *
+ * @throws IOException on failure to read/write SSTable/standard out
+ */
+ public static void export(String ssTableFile, String[] excludes) throws IOException
+ {
+ export(ssTableFile, System.out, excludes);
+ }
+
+ /**
+ * Given arguments specifying an SSTable, and optionally an output file,
+ * export the contents of the SSTable to JSON.
+ *
+ * @param args command lines arguments
+ *
+ * @throws IOException on failure to open/read/write files or output streams
+ * @throws ConfigurationException on configuration failure (wrong params given)
+ */
+ public static void main(String[] args) throws IOException, ConfigurationException
+ {
+ String usage = String.format("Usage: %s <sstable> [-k key [-k key [...]] -x key [-x key [...]]]%n", SSTableExport.class.getName());
+
+ CommandLineParser parser = new PosixParser();
+ try
+ {
+ cmd = parser.parse(options, args);
+ }
+ catch (ParseException e1)
+ {
+ System.err.println(e1.getMessage());
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+
+ if (cmd.getArgs().length != 1)
+ {
+ System.err.println("You must supply exactly one sstable");
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+
+ String[] keys = cmd.getOptionValues(KEY_OPTION);
+ String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION);
+ String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath();
+
+ DatabaseDescriptor.loadSchemas();
+ if (DatabaseDescriptor.getNonSystemTables().size() < 1)
+ {
+ String msg = "no non-system tables are defined";
+ System.err.println(msg);
+ throw new ConfigurationException(msg);
+ }
+
+ if (cmd.hasOption(ENUMERATEKEYS_OPTION))
+ {
+ enumeratekeys(ssTableFileName, System.out);
+ }
+ else
+ {
+ if ((keys != null) && (keys.length > 0))
+ export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
+ else
+ export(ssTableFileName, excludes);
+ }
+
+ System.exit(0);
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=1087815&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Fri Apr 1 17:15:06 2011
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.codehaus.jackson.type.TypeReference;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.MappingJsonFactory;
+
+import org.codehaus.jackson.JsonParser;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
+
+/**
+ * Create SSTables from JSON input
+ */
+public class SSTableImport
+{
+ private static final String KEYSPACE_OPTION = "K";
+ private static final String COLUMN_FAMILY_OPTION = "c";
+ private static final String KEY_COUNT_OPTION = "n";
+ private static final String IS_SORTED_OPTION = "s";
+
+ private static Options options;
+ private static CommandLine cmd;
+
+ private static Integer keyCountToImport = null;
+ private static boolean isSorted = false;
+
+ private static JsonFactory factory = new MappingJsonFactory();
+
+ static
+ {
+ options = new Options();
+
+ Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name.");
+ optKeyspace.setRequired(true);
+ options.addOption(optKeyspace);
+
+ Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name.");
+ optColfamily.setRequired(true);
+ options.addOption(optColfamily);
+
+ options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional)."));
+ options.addOption(new Option(IS_SORTED_OPTION, false, "Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional)."));
+ }
+
+ private static class JsonColumn<T>
+ {
+ private ByteBuffer name;
+ private ByteBuffer value;
+ private long timestamp;
+
+ private String kind;
+ // Expiring columns
+ private int ttl;
+ private int localExpirationTime;
+
+ // Counter columns
+ private long timestampOfLastDelete;
+
+ public JsonColumn(T json, CFMetaData meta, boolean isSubColumn)
+ {
+ AbstractType comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator;
+
+ if (json instanceof List)
+ {
+ List fields = (List<?>) json;
+
+ assert fields.size() >= 3 : "Column definition should have at least 3";
+
+ name = stringAsType((String) fields.get(0), comparator);
+ value = stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate()));
+ timestamp = (Long) fields.get(2);
+ kind = "";
+
+ if (fields.size() > 3)
+ {
+ if (fields.get(3) instanceof Boolean)
+ {
+ // old format, reading this for backward compatibility sake
+ if (fields.size() == 6)
+ {
+ kind = "e";
+ ttl = (Integer) fields.get(4);
+ localExpirationTime = (int) (long) ((Long) fields.get(5));
+ }
+ else
+ {
+ kind = ((Boolean) fields.get(3)) ? "d" : "";
+ }
+ }
+ else
+ {
+ kind = (String) fields.get(3);
+ if (isExpiring())
+ {
+ ttl = (Integer) fields.get(4);
+ localExpirationTime = (int) (long) ((Long) fields.get(5));
+ }
+ else if (isCounter())
+ {
+ timestampOfLastDelete = (long) ((Integer) fields.get(4));
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isDeleted()
+ {
+ return kind.equals("d");
+ }
+
+ public boolean isExpiring()
+ {
+ return kind.equals("e");
+ }
+
+ public boolean isCounter()
+ {
+ return kind.equals("c");
+ }
+
+ public ByteBuffer getName()
+ {
+ return name.duplicate();
+ }
+
+ public ByteBuffer getValue()
+ {
+ return value.duplicate();
+ }
+ }
+
+ private static void addToStandardCF(List<?> row, ColumnFamily cfamily)
+ {
+ addColumnsToCF(row, null, cfamily);
+ }
+
+ /**
+ * Add columns to a column family.
+ *
+ * @param row the columns associated with a row
+ * @param superName name of the super column if any
+ * @param cfamily the column family to add columns to
+ */
+ private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily)
+ {
+ CFMetaData cfm = cfamily.metadata();
+ assert cfm != null;
+
+ for (Object c : row)
+ {
+ JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null));
+ QueryPath path = new QueryPath(cfm.cfName, superName, col.getName());
+
+ if (col.isExpiring())
+ {
+ cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, col.localExpirationTime));
+ }
+ else if (col.isCounter())
+ {
+ cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp, col.timestampOfLastDelete));
+ }
+ else if (col.isDeleted())
+ {
+ cfamily.addTombstone(path, col.getValue(), col.timestamp);
+ }
+ else
+ {
+ cfamily.addColumn(path, col.getValue(), col.timestamp);
+ }
+ }
+ }
+
+ /**
+ * Add super columns to a column family.
+ *
+ * @param row the super columns associated with a row
+ * @param cfamily the column family to add columns to
+ */
+ private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily)
+ {
+ CFMetaData metaData = cfamily.metadata();
+ assert metaData != null;
+
+ AbstractType comparator = metaData.comparator;
+
+ // Super columns
+ for (Map.Entry<?, ?> entry : row.entrySet())
+ {
+ Map<?, ?> data = (Map<?, ?>) entry.getValue();
+
+ addColumnsToCF((List<?>) data.get("subColumns"), stringAsType((String) entry.getKey(), comparator), cfamily);
+
+ // *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side
+ //BigInteger deletedAt = (BigInteger) data.get("deletedAt");
+ //SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName);
+ //superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt);
+ }
+ }
+
+ /**
+ * Convert a JSON formatted file to an SSTable.
+ *
+ * @param jsonFile the file containing JSON formatted data
+ * @param keyspace keyspace the data belongs to
+ * @param cf column family the data belongs to
+ * @param ssTablePath file to write the SSTable to
+ *
+ * @throws IOException for errors reading/writing input/output
+ */
+ public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException
+ {
+ ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf);
+ IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
+
+ int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner)
+ : importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner);
+
+ if (importedKeys != -1)
+ System.out.printf("%d keys imported successfully.%n", importedKeys);
+ }
+
+ private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException
+ {
+ int importedKeys = 0;
+ long start = System.currentTimeMillis();
+ Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() {});
+
+ keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport;
+ SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+
+ System.out.printf("Importing %s keys...%n", keyCountToImport);
+
+ // sort by dk representation, but hold onto the hex version
+ SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>();
+
+ for (Object keyObject : data.keySet())
+ {
+ String key = (String) keyObject;
+ decoratedKeys.put(partitioner.decorateKey(hexToBytes(key)), key);
+ }
+
+ for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet())
+ {
+ if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super)
+ {
+ addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily);
+ }
+ else
+ {
+ addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily);
+ }
+
+ writer.append(rowKey.getKey(), columnFamily);
+ columnFamily.clear();
+
+ importedKeys++;
+
+ long current = System.currentTimeMillis();
+
+ if (current - start >= 5000) // 5 secs.
+ {
+ System.out.printf("Currently imported %d keys.%n", importedKeys);
+ start = current;
+ }
+
+ if (keyCountToImport == importedKeys)
+ break;
+ }
+
+ writer.closeAndOpenReader();
+
+ return importedKeys;
+ }
+
+ public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException
+ {
+ int importedKeys = 0; // already imported keys count
+ long start = System.currentTimeMillis();
+
+ JsonParser parser = getParser(jsonFile);
+
+ if (keyCountToImport == null)
+ {
+ keyCountToImport = 0;
+ System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
+
+ parser.nextToken(); // START_OBJECT
+ while (parser.nextToken() != null)
+ {
+ parser.nextToken();
+ parser.skipChildren();
+ if (parser.getCurrentName() == null) continue;
+
+ keyCountToImport++;
+ }
+ }
+
+ System.out.printf("Importing %s keys...%n", keyCountToImport);
+
+ parser = getParser(jsonFile); // renewing parser
+ SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+
+ int lineNumber = 1;
+ DecoratedKey prevStoredKey = null;
+
+ while (parser.nextToken() != null)
+ {
+ String key = parser.getCurrentName();
+
+ if (key != null)
+ {
+ String tokenName = parser.nextToken().name();
+
+ if (tokenName.equals("START_ARRAY"))
+ {
+ if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super)
+ {
+ throw new RuntimeException("Can't write Standard columns to the Super Column Family.");
+ }
+
+ List<?> columns = parser.readValueAs(new TypeReference<List<?>>() {});
+ addToStandardCF(columns, columnFamily);
+ }
+ else if (tokenName.equals("START_OBJECT"))
+ {
+ if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Standard)
+ {
+ throw new RuntimeException("Can't write Super columns to the Standard Column Family.");
+ }
+
+ Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() {});
+ addToSuperCF(columns, columnFamily);
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Only Array or Hash allowed as row content.");
+ }
+
+ DecoratedKey currentKey = partitioner.decorateKey(hexToBytes(key));
+
+ if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
+ {
+ System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", lineNumber, key);
+ return -1;
+ }
+
+ // saving decorated key
+ writer.append(currentKey, columnFamily);
+ columnFamily.clear();
+
+ prevStoredKey = currentKey;
+ importedKeys++;
+ lineNumber++;
+
+ long current = System.currentTimeMillis();
+
+ if (current - start >= 5000) // 5 secs.
+ {
+ System.out.printf("Currently imported %d keys.%n", importedKeys);
+ start = current;
+ }
+
+ if (keyCountToImport == importedKeys)
+ break;
+ }
+ }
+
+ writer.closeAndOpenReader();
+
+ return importedKeys;
+ }
+
+ /**
+ * Get JsonParser object for file
+ * @param fileName name of the file
+ * @return json parser instance for given file
+ * @throws IOException if any I/O error.
+ */
+ private static JsonParser getParser(String fileName) throws IOException
+ {
+ return factory.createJsonParser(new File(fileName)).configure(JsonParser.Feature.INTERN_FIELD_NAMES, false);
+ }
+
+ /**
+ * Converts JSON to an SSTable file. JSON input can either be a file specified
+ * using an optional command line argument, or supplied on standard in.
+ *
+ * @param args command line arguments
+ * @throws IOException on failure to open/read/write files or output streams
+ * @throws ParseException on failure to parse JSON input
+ * @throws ConfigurationException on configuration error.
+ */
+ public static void main(String[] args) throws IOException, ParseException, ConfigurationException
+ {
+ CommandLineParser parser = new PosixParser();
+
+ try
+ {
+ cmd = parser.parse(options, args);
+ }
+ catch (org.apache.commons.cli.ParseException e)
+ {
+ System.err.println(e.getMessage());
+ printProgramUsage();
+ System.exit(1);
+ }
+
+ if (cmd.getArgs().length != 2)
+ {
+ printProgramUsage();
+ System.exit(1);
+ }
+
+ String json = cmd.getArgs()[0];
+ String ssTable = cmd.getArgs()[1];
+ String keyspace = cmd.getOptionValue(KEYSPACE_OPTION);
+ String cfamily = cmd.getOptionValue(COLUMN_FAMILY_OPTION);
+
+ if (cmd.hasOption(KEY_COUNT_OPTION))
+ {
+ keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION));
+ }
+
+ if (cmd.hasOption(IS_SORTED_OPTION))
+ {
+ isSorted = true;
+ }
+
+ DatabaseDescriptor.loadSchemas();
+ if (DatabaseDescriptor.getNonSystemTables().size() < 1)
+ {
+ String msg = "no non-system tables are defined";
+ System.err.println(msg);
+ throw new ConfigurationException(msg);
+ }
+
+ try
+ {
+ importJson(json, keyspace, cfamily, ssTable);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.err.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+
+ System.exit(0);
+ }
+
+ private static void printProgramUsage()
+ {
+ System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
+ SSTableImport.class.getName());
+
+ System.out.println("Options:");
+ for (Object o : options.getOptions())
+ {
+ Option opt = (Option) o;
+ System.out.println(" -" +opt.getOpt() + " - " + opt.getDescription());
+ }
+ }
+
+ /**
+ * Used by test framework to set key count
+ * @param keyCount numbers of keys to import
+ */
+ public static void setKeyCountToImport(Integer keyCount)
+ {
+ keyCountToImport = keyCount;
+ }
+
+ /**
+ * Convert a string to bytes (ByteBuffer) according to type
+ * @param content string to convert
+ * @param type type to use for conversion
+ * @return byte buffer representation of the given string
+ */
+ private static ByteBuffer stringAsType(String content, AbstractType type)
+ {
+ try
+ {
+ return (type == BytesType.instance) ? hexToBytes(content) : type.fromString(content);
+ }
+ catch (MarshalException e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+}