You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/01 19:15:07 UTC

svn commit: r1087815 - in /cassandra/trunk/src/java/org/apache/cassandra/tools: SSTableExport.java SSTableImport.java

Author: jbellis
Date: Fri Apr  1 17:15:06 2011
New Revision: 1087815

URL: http://svn.apache.org/viewvc?rev=1087815&view=rev
Log:
really revert SSTableExport move

Added:
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java

Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1087815&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Fri Apr  1 17:15:06 2011
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
+import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
+
+/**
+ * Export SSTables to JSON format.
+ */
+public class SSTableExport
+{
+    // size of the columns page
+    private static final int PAGE_SIZE = 1000;
+
+    private static final String KEY_OPTION = "k";
+    private static final String EXCLUDEKEY_OPTION = "x";
+    private static final String ENUMERATEKEYS_OPTION = "e";
+    private static Options options;
+    private static CommandLine cmd;
+    
+    static
+    {
+        options = new Options();
+
+        Option optKey = new Option(KEY_OPTION, true, "Row key");
+        // Number of times -k <key> can be passed on the command line.
+        optKey.setArgs(500);
+        options.addOption(optKey);
+
+        Option excludeKey = new Option(EXCLUDEKEY_OPTION, true, "Excluded row key");
+        // Number of times -x <key> can be passed on the command line.
+        excludeKey.setArgs(500);
+        options.addOption(excludeKey);
+
+        Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
+        options.addOption(optEnumerate);
+    }
+
+    /**
+     * Wraps given string into quotes
+     * @param val string to quote
+     * @return quoted string
+     */
+    private static String quote(String val)
+    {
+        return String.format("\"%s\"", val);
+    }
+
+    /**
+     * JSON Hash Key serializer
+     * @param val value to set as a key
+     * @return JSON Hash key
+     */
+    private static String asKey(String val)
+    {
+        return String.format("%s: ", quote(val));
+    }
+
+    /**
+     * Serialize columns using given column iterator
+     * @param columns column iterator
+     * @param out output stream
+     * @param comparator columns comparator
+     * @param cfMetaData Column Family metadata (to get validator)
+     * @return pair of (number of columns serialized, last column serialized)
+     */
+    private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+    {
+        while (columns.hasNext())
+        {
+            IColumn column = columns.next();
+            serializeColumn(column, out, comparator, cfMetaData);
+
+            if (columns.hasNext())
+                out.print(", ");
+        }
+    }
+
+    /**
+     * Serialize a given column to the JSON format
+     * @param column column presentation
+     * @param out output stream
+     * @param comparator columns comparator
+     * @param cfMetaData Column Family metadata (to get validator)
+     */
+    private static void serializeColumn(IColumn column, PrintStream out, AbstractType comparator, CFMetaData cfMetaData)
+    {
+        ByteBuffer name = ByteBufferUtil.clone(column.name());
+        ByteBuffer value = ByteBufferUtil.clone(column.value());
+        AbstractType validator = cfMetaData.getValueValidator(name);
+
+        out.print("[");
+        out.print(quote(comparator.getString(name)));
+        out.print(", ");
+        out.print(quote(validator.getString(value)));
+        out.print(", ");
+        out.print(column.timestamp());
+
+        if (column instanceof DeletedColumn)
+        {
+            out.print(", ");
+            out.print("\"d\"");
+        }
+        else if (column instanceof ExpiringColumn)
+        {
+            out.print(", ");
+            out.print("\"e\"");
+            out.print(", ");
+            out.print(((ExpiringColumn) column).getTimeToLive());
+            out.print(", ");
+            out.print(column.getLocalDeletionTime());
+        }
+        else if (column instanceof CounterColumn)
+        {
+            out.print(", ");
+            out.print("\"c\"");
+            out.print(", ");
+            out.print(((CounterColumn) column).timestampOfLastDelete());
+        }
+
+        out.print("]");
+    }
+
+    /**
+     * Get portion of the columns and serialize in loop while not more columns left in the row
+     * @param row SSTableIdentityIterator row representation with Column Family
+     * @param key Decorated Key for the required row
+     * @param out output stream
+     */
+    private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
+    {
+        ColumnFamily columnFamily = row.getColumnFamily();
+        boolean isSuperCF = columnFamily.isSuper();
+        CFMetaData cfMetaData = columnFamily.metadata();
+        AbstractType comparator = columnFamily.getComparator();
+
+        out.print(asKey(bytesToHex(key.key)));
+        out.print(isSuperCF ? "{" : "[");
+
+        if (isSuperCF)
+        {
+            while (row.hasNext())
+            {
+                IColumn column = row.next();
+
+                out.print(asKey(comparator.getString(column.name())));
+                out.print("{");
+                out.print(asKey("deletedAt"));
+                out.print(column.getMarkedForDeleteAt());
+                out.print(", ");
+                out.print(asKey("subColumns"));
+                out.print("[");
+                serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData);
+                out.print("]");
+                out.print("}");
+
+                if (row.hasNext())
+                    out.print(", ");
+            }
+        }
+        else
+        {
+            serializeColumns(row, out, comparator, cfMetaData);
+        }
+
+        out.print(isSuperCF ? "}" : "]");
+    }
+
+    /**
+     * Enumerate row keys from an SSTableReader and write the result to a PrintStream.
+     * 
+     * @param ssTableFile the file to export the rows from
+     * @param outs PrintStream to write the output to
+     * @throws IOException on failure to read/write input/output
+     */
+    public static void enumeratekeys(String ssTableFile, PrintStream outs)
+    throws IOException
+    {
+        Descriptor desc = Descriptor.fromFilename(ssTableFile);
+        KeyIterator iter = new KeyIterator(desc);
+        DecoratedKey lastKey = null;
+        while (iter.hasNext())
+        {
+            DecoratedKey key = iter.next();
+
+            // validate order of the keys in the sstable
+            if (lastKey != null && lastKey.compareTo(key) > 0 )
+                throw new IOException("Key out of order! " + lastKey + " > " + key);
+            lastKey = key;
+
+            outs.println(bytesToHex(key.key));
+        }
+        iter.close();
+        outs.flush();
+    }
+
+    /**
+     * Export specific rows from an SSTable and write the resulting JSON to a PrintStream.
+     * 
+     * @param ssTableFile the SSTableScanner to export the rows from
+     * @param outs PrintStream to write the output to
+     * @param toExport the keys corresponding to the rows to export
+     * @param excludes keys to exclude from export
+     * @throws IOException on failure to read/write input/output
+     */
+    public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
+    {
+        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
+        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+        IPartitioner<?> partitioner = StorageService.getPartitioner();
+
+        if (excludes != null)
+            toExport.removeAll(Arrays.asList(excludes));
+
+        outs.println("{");
+
+        int i = 0;
+
+        // last key to compare order
+        DecoratedKey lastKey = null;
+
+        for (String key : toExport)
+        {
+            DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
+
+            if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
+                throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
+
+            lastKey = decoratedKey;
+
+            scanner.seekTo(decoratedKey);
+
+            if (!scanner.hasNext())
+                continue;
+
+            SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+            if (!row.getKey().equals(decoratedKey))
+                continue;
+
+            serializeRow(row, decoratedKey, outs);
+
+            if (i != 0)
+                outs.println(",");
+
+            i++;
+        }
+
+        outs.println("\n}");
+        outs.flush();
+
+        scanner.close();
+    }
+
+    // This is necessary to accommodate the test suite since you cannot open a Reader more
+    // than once from within the same process.
+    static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
+    {
+        Set<String> excludeSet = new HashSet<String>();
+
+        if (excludes != null)
+            excludeSet = new HashSet<String>(Arrays.asList(excludes));
+
+
+        SSTableIdentityIterator row;
+        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+
+        outs.println("{");
+
+        int i = 0;
+
+        // collecting keys to export
+        while (scanner.hasNext())
+        {
+            row = (SSTableIdentityIterator) scanner.next();
+
+            String currentKey = bytesToHex(row.getKey().key);
+
+            if (excludeSet.contains(currentKey))
+                continue;
+            else if (i != 0)
+                outs.println(",");
+
+            serializeRow(row, row.getKey(), outs);
+
+            i++;
+        }
+
+        outs.println("\n}");
+        outs.flush();
+
+        scanner.close();
+    }
+    
+    /**
+     * Export an SSTable and write the resulting JSON to a PrintStream.
+     * 
+     * @param ssTableFile the SSTable to export
+     * @param outs PrintStream to write the output to
+     * @param excludes keys to exclude from export
+     *
+     * @throws IOException on failure to read/write input/output
+     */
+    public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
+    {
+        export(SSTableReader.open(Descriptor.fromFilename(ssTableFile)), outs, excludes);
+    }
+
+    /**
+     * Export an SSTable and write the resulting JSON to standard out.
+     * 
+     * @param ssTableFile SSTable to export
+     * @param excludes keys to exclude from export
+     *
+     * @throws IOException on failure to read/write SSTable/standard out
+     */
+    public static void export(String ssTableFile, String[] excludes) throws IOException
+    {
+        export(ssTableFile, System.out, excludes);
+    }
+
+    /**
+     * Given arguments specifying an SSTable, and optionally an output file,
+     * export the contents of the SSTable to JSON.
+     *  
+     * @param args command lines arguments
+     *
+     * @throws IOException on failure to open/read/write files or output streams
+     * @throws ConfigurationException on configuration failure (wrong params given)
+     */
+    public static void main(String[] args) throws IOException, ConfigurationException
+    {
+        String usage = String.format("Usage: %s <sstable> [-k key [-k key [...]] -x key [-x key [...]]]%n", SSTableExport.class.getName());
+        
+        CommandLineParser parser = new PosixParser();
+        try
+        {
+            cmd = parser.parse(options, args);
+        }
+        catch (ParseException e1)
+        {
+            System.err.println(e1.getMessage());
+            System.err.println(usage);
+            System.exit(1);
+        }
+
+
+        if (cmd.getArgs().length != 1)
+        {
+            System.err.println("You must supply exactly one sstable");
+            System.err.println(usage);
+            System.exit(1);
+        }
+        
+
+        String[] keys = cmd.getOptionValues(KEY_OPTION);
+        String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION);
+        String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath();
+
+        DatabaseDescriptor.loadSchemas();
+        if (DatabaseDescriptor.getNonSystemTables().size() < 1)
+        {
+            String msg = "no non-system tables are defined";
+            System.err.println(msg);
+            throw new ConfigurationException(msg);
+        }
+
+        if (cmd.hasOption(ENUMERATEKEYS_OPTION))
+        {
+            enumeratekeys(ssTableFileName, System.out);
+        }
+        else
+        {
+            if ((keys != null) && (keys.length > 0))
+                export(ssTableFileName, System.out, Arrays.asList(keys), excludes);
+            else
+                export(ssTableFileName, excludes);
+        }
+
+        System.exit(0);
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=1087815&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Fri Apr  1 17:15:06 2011
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.codehaus.jackson.type.TypeReference;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.MappingJsonFactory;
+
+import org.codehaus.jackson.JsonParser;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
+
+/**
+ * Create SSTables from JSON input
+ */
+public class SSTableImport
+{
+    private static final String KEYSPACE_OPTION = "K";
+    private static final String COLUMN_FAMILY_OPTION = "c";
+    private static final String KEY_COUNT_OPTION = "n";
+    private static final String IS_SORTED_OPTION = "s";
+
+    private static Options options;
+    private static CommandLine cmd;
+
+    private static Integer keyCountToImport = null;
+    private static boolean isSorted = false;
+
+    private static JsonFactory factory = new MappingJsonFactory();
+
+    static
+    {
+        options = new Options();
+
+        Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name.");
+        optKeyspace.setRequired(true);
+        options.addOption(optKeyspace);
+
+        Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name.");
+        optColfamily.setRequired(true);
+        options.addOption(optColfamily);
+
+        options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional)."));
+        options.addOption(new Option(IS_SORTED_OPTION, false, "Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional)."));
+    }
+
+    private static class JsonColumn<T>
+    {
+        private ByteBuffer name;
+        private ByteBuffer value;
+        private long timestamp;
+
+        private String kind;
+        // Expiring columns
+        private int ttl;
+        private int localExpirationTime;
+
+        // Counter columns
+        private long timestampOfLastDelete;
+
+        public JsonColumn(T json, CFMetaData meta, boolean isSubColumn)
+        {
+            AbstractType comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator;
+
+            if (json instanceof List)
+            {
+                List fields = (List<?>) json;
+
+                assert fields.size() >= 3 : "Column definition should have at least 3";
+
+                name  = stringAsType((String) fields.get(0), comparator);
+                value = stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate()));
+                timestamp = (Long) fields.get(2);
+                kind = "";
+
+                if (fields.size() > 3)
+                {
+                    if (fields.get(3) instanceof Boolean)
+                    {
+                        // old format, reading this for backward compatibility sake
+                        if (fields.size() == 6)
+                        {
+                            kind = "e";
+                            ttl = (Integer) fields.get(4);
+                            localExpirationTime = (int) (long) ((Long) fields.get(5));
+                        }
+                        else
+                        {
+                            kind = ((Boolean) fields.get(3)) ? "d" : "";
+                        }
+                    }
+                    else
+                    {
+                        kind = (String) fields.get(3);
+                        if (isExpiring())
+                        {
+                            ttl = (Integer) fields.get(4);
+                            localExpirationTime = (int) (long) ((Long) fields.get(5));
+                        }
+                        else if (isCounter())
+                        {
+                            timestampOfLastDelete = (long) ((Integer) fields.get(4));
+                        }
+                    }
+                }
+            }
+        }
+
+        public boolean isDeleted()
+        {
+            return kind.equals("d");
+        }
+
+        public boolean isExpiring()
+        {
+            return kind.equals("e");
+        }
+
+        public boolean isCounter()
+        {
+            return kind.equals("c");
+        }
+
+        public ByteBuffer getName()
+        {
+            return name.duplicate();
+        }
+
+        public ByteBuffer getValue()
+        {
+            return value.duplicate();
+        }
+    }
+
+    private static void addToStandardCF(List<?> row, ColumnFamily cfamily)
+    {
+        addColumnsToCF(row, null, cfamily);
+    }
+
+    /**
+     * Add columns to a column family.
+     * 
+     * @param row the columns associated with a row
+     * @param superName name of the super column if any
+     * @param cfamily the column family to add columns to
+     */
+    private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily)
+    {
+        CFMetaData cfm = cfamily.metadata();
+        assert cfm != null;
+
+        for (Object c : row)
+        {
+            JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null));
+            QueryPath path = new QueryPath(cfm.cfName, superName, col.getName());
+
+            if (col.isExpiring())
+            {
+                cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, col.localExpirationTime));
+            }
+            else if (col.isCounter())
+            {
+                cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp, col.timestampOfLastDelete));
+            }
+            else if (col.isDeleted())
+            {
+                cfamily.addTombstone(path, col.getValue(), col.timestamp);
+            }
+            else
+            {
+                cfamily.addColumn(path, col.getValue(), col.timestamp);
+            }
+        }
+    }
+    
+    /**
+     * Add super columns to a column family.
+     * 
+     * @param row the super columns associated with a row
+     * @param cfamily the column family to add columns to
+     */
+    private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily)
+    {
+        CFMetaData metaData = cfamily.metadata();
+        assert metaData != null;
+
+        AbstractType comparator = metaData.comparator;
+
+        // Super columns
+        for (Map.Entry<?, ?> entry : row.entrySet())
+        {
+            Map<?, ?> data = (Map<?, ?>) entry.getValue();
+
+            addColumnsToCF((List<?>) data.get("subColumns"), stringAsType((String) entry.getKey(), comparator), cfamily);
+
+            // *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side
+            //BigInteger deletedAt = (BigInteger) data.get("deletedAt");
+            //SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName);
+            //superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt);
+        }
+    }
+
+    /**
+     * Convert a JSON formatted file to an SSTable.
+     * 
+     * @param jsonFile the file containing JSON formatted data
+     * @param keyspace keyspace the data belongs to
+     * @param cf column family the data belongs to
+     * @param ssTablePath file to write the SSTable to
+     *
+     * @throws IOException for errors reading/writing input/output
+     */
+    public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException
+    {
+        ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf);
+        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
+
+        int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner)
+                                      : importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner);
+
+        if (importedKeys != -1)
+            System.out.printf("%d keys imported successfully.%n", importedKeys);
+    }
+
+    private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException
+    {
+        int importedKeys = 0;
+        long start = System.currentTimeMillis();
+        Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() {});
+
+        keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport;
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+
+        System.out.printf("Importing %s keys...%n", keyCountToImport);
+
+        // sort by dk representation, but hold onto the hex version
+        SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>();
+
+        for (Object keyObject : data.keySet())
+        {
+            String key = (String) keyObject;
+            decoratedKeys.put(partitioner.decorateKey(hexToBytes(key)), key);
+        }
+
+        for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet())
+        {
+            if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super)
+            {
+                addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily);
+            }
+            else
+            {
+                addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily);
+            }
+
+            writer.append(rowKey.getKey(), columnFamily);
+            columnFamily.clear();
+
+            importedKeys++;
+
+            long current = System.currentTimeMillis();
+
+            if (current - start >= 5000) // 5 secs.
+            {
+                System.out.printf("Currently imported %d keys.%n", importedKeys);
+                start = current;
+            }
+
+            if (keyCountToImport == importedKeys)
+                break;
+        }
+
+        writer.closeAndOpenReader();
+
+        return importedKeys;
+    }
+
+    public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException
+    {
+        int importedKeys = 0; // already imported keys count
+        long start = System.currentTimeMillis();
+
+        JsonParser parser = getParser(jsonFile);
+
+        if (keyCountToImport == null)
+        {
+            keyCountToImport = 0;
+            System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
+
+            parser.nextToken(); // START_OBJECT
+            while (parser.nextToken() != null)
+            {
+                parser.nextToken();
+                parser.skipChildren();
+                if (parser.getCurrentName() == null) continue;
+
+                keyCountToImport++;
+            }
+        }
+
+        System.out.printf("Importing %s keys...%n", keyCountToImport);
+
+        parser = getParser(jsonFile); // renewing parser
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+
+        int lineNumber = 1;
+        DecoratedKey prevStoredKey = null;
+
+        while (parser.nextToken() != null)
+        {
+            String key = parser.getCurrentName();
+
+            if (key != null)
+            {
+                String tokenName = parser.nextToken().name();
+
+                if (tokenName.equals("START_ARRAY"))
+                {
+                    if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super)
+                    {
+                        throw new RuntimeException("Can't write Standard columns to the Super Column Family.");
+                    }
+
+                    List<?> columns = parser.readValueAs(new TypeReference<List<?>>() {});
+                    addToStandardCF(columns, columnFamily);
+                }
+                else if (tokenName.equals("START_OBJECT"))
+                {
+                    if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Standard)
+                    {
+                        throw new RuntimeException("Can't write Super columns to the Standard Column Family.");
+                    }
+
+                    Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() {});
+                    addToSuperCF(columns, columnFamily);
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Only Array or Hash allowed as row content.");
+                }
+
+                DecoratedKey currentKey = partitioner.decorateKey(hexToBytes(key));
+
+                if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
+                {
+                    System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", lineNumber, key);
+                    return -1;
+                }
+
+                // saving decorated key
+                writer.append(currentKey, columnFamily);
+                columnFamily.clear();
+
+                prevStoredKey = currentKey;
+                importedKeys++;
+                lineNumber++;
+
+                long current = System.currentTimeMillis();
+
+                if (current - start >= 5000) // 5 secs.
+                {
+                    System.out.printf("Currently imported %d keys.%n", importedKeys);
+                    start = current;
+                }
+
+                if (keyCountToImport == importedKeys)
+                    break;
+            }
+        }
+
+        writer.closeAndOpenReader();
+
+        return importedKeys;
+    }
+
+    /**
+     * Get JsonParser object for file
+     * @param fileName name of the file
+     * @return json parser instance for given file
+     * @throws IOException if any I/O error.
+     */
+    private static JsonParser getParser(String fileName) throws IOException
+    {
+        return factory.createJsonParser(new File(fileName)).configure(JsonParser.Feature.INTERN_FIELD_NAMES, false);
+    }
+
+    /**
+     * Converts JSON to an SSTable file. JSON input can either be a file specified
+     * using an optional command line argument, or supplied on standard in.
+     * 
+     * @param args command line arguments
+     * @throws IOException on failure to open/read/write files or output streams
+     * @throws ParseException on failure to parse JSON input
+     * @throws ConfigurationException on configuration error.
+     */
+    public static void main(String[] args) throws IOException, ParseException, ConfigurationException
+    {
+        CommandLineParser parser = new PosixParser();
+
+        try
+        {
+            cmd = parser.parse(options, args);
+        }
+        catch (org.apache.commons.cli.ParseException e)
+        {
+            System.err.println(e.getMessage());
+            printProgramUsage();
+            System.exit(1);
+        }
+
+        if (cmd.getArgs().length != 2)
+        {
+            printProgramUsage();
+            System.exit(1);
+        }
+
+        String json     = cmd.getArgs()[0];
+        String ssTable  = cmd.getArgs()[1];
+        String keyspace = cmd.getOptionValue(KEYSPACE_OPTION);
+        String cfamily  = cmd.getOptionValue(COLUMN_FAMILY_OPTION);
+
+        if (cmd.hasOption(KEY_COUNT_OPTION))
+        {
+            keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION));
+        }
+
+        if (cmd.hasOption(IS_SORTED_OPTION))
+        {
+            isSorted = true;
+        }
+
+        DatabaseDescriptor.loadSchemas();
+        if (DatabaseDescriptor.getNonSystemTables().size() < 1)
+        {
+            String msg = "no non-system tables are defined";
+            System.err.println(msg);
+            throw new ConfigurationException(msg);
+        }
+
+        try
+        {
+            importJson(json, keyspace, cfamily, ssTable);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            System.err.println("ERROR: " + e.getMessage());
+            System.exit(-1);
+        }
+
+        System.exit(0);
+    }
+
+    private static void printProgramUsage()
+    {
+        System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
+                            SSTableImport.class.getName());
+
+        System.out.println("Options:");
+        for (Object o :  options.getOptions())
+        {
+            Option opt = (Option) o;
+            System.out.println("  -" +opt.getOpt() + " - " + opt.getDescription());
+        }
+    }
+
+    /**
+     * Used by test framework to set key count
+     * @param keyCount numbers of keys to import
+     */
+    public static void setKeyCountToImport(Integer keyCount)
+    {
+        keyCountToImport = keyCount;
+    }
+
+    /**
+     * Convert a string to bytes (ByteBuffer) according to type
+     * @param content string to convert
+     * @param type type to use for conversion
+     * @return byte buffer representation of the given string
+     */
+    private static ByteBuffer stringAsType(String content, AbstractType type)
+    {
+        try
+        {
+            return (type == BytesType.instance) ? hexToBytes(content) : type.fromString(content);
+        }
+        catch (MarshalException e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+}