You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/19 12:44:49 UTC

svn commit: r1148267 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/io/sstable/

Author: slebresne
Date: Tue Jul 19 10:44:48 2011
New Revision: 1148267

URL: http://svn.apache.org/viewvc?rev=1148267&view=rev
Log:
Add simplified interfaces to write sstables (for bulk loading)
patch by slebresne; reviewed by jbellis for CASSANDRA-2911

Added:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1148267&r1=1148266&r2=1148267&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jul 19 10:44:48 2011
@@ -30,6 +30,8 @@
  * avoid including inferred types in CF update (CASSANDRA-2809)
  * fix JMX bulkload call (CASSANDRA-2908)
  * fix updating KS with durable_writes=false (CASSANDRA-2907)
+ * add simplified facade to SSTableWriter for bulk loading use
+   (CASSANDRA-2911)
 
 
 0.8.1

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java?rev=1148267&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java Tue Jul 19 10:44:48 2011
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.NodeId;
+import org.apache.cassandra.utils.Pair;
+
+public abstract class AbstractSSTableSimpleWriter
+{
+    protected final File directory;
+    protected final CFMetaData metadata;
+    protected DecoratedKey currentKey;
+    protected ColumnFamily columnFamily;
+    protected SuperColumn currentSuperColumn;
+    protected final NodeId nodeid = NodeId.generate();
+
+    public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata)
+    {
+        this.metadata = metadata;
+        this.directory = directory;
+    }
+
+    protected SSTableWriter getWriter() throws IOException
+    {
+        return new SSTableWriter(
+            makeFilename(directory, metadata.ksName, metadata.cfName),
+            0, // We don't care about the bloom filter
+            metadata,
+            StorageService.getPartitioner(),
+            ReplayPosition.NONE);
+    }
+
+    // find available generation and pick up filename from that
+    private static String makeFilename(File directory, final String keyspace, final String columnFamily)
+    {
+        final Set<Descriptor> existing = new HashSet<Descriptor>();
+        directory.list(new FilenameFilter()
+        {
+            public boolean accept(File dir, String name)
+            {
+                Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
+                Descriptor desc = p == null ? null : p.left;
+                if (desc == null)
+                    return false;
+
+                if (desc.cfname.equals(columnFamily))
+                    existing.add(desc);
+
+                return false;
+            }
+        });
+        int maxGen = 0;
+        for (Descriptor desc : existing)
+            maxGen = Math.max(maxGen, desc.generation);
+        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, false).filenameFor(Component.DATA);
+    }
+
+    /**
+     * Start a new row whose key is {@code key}.
+     * @param key the row key
+     */
+    public void newRow(ByteBuffer key) throws IOException
+    {
+        if (currentKey != null && !columnFamily.isEmpty())
+            writeRow(currentKey, columnFamily);
+
+        currentKey = StorageService.getPartitioner().decorateKey(key);
+        columnFamily = ColumnFamily.create(metadata);
+    }
+
+    /**
+     * Start a new super column with name {@code name}.
+     * @param name the name for the super column
+     */
+    public void newSuperColumn(ByteBuffer name)
+    {
+        if (!columnFamily.isSuper())
+            throw new IllegalStateException("Cannot add a super column to a standard column family");
+
+        currentSuperColumn = new SuperColumn(name, metadata.subcolumnComparator);
+        columnFamily.addColumn(currentSuperColumn);
+    }
+
+    private void addColumn(IColumn column)
+    {
+        if (columnFamily.isSuper() && currentSuperColumn == null)
+            throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
+
+        IColumnContainer container = columnFamily.isSuper() ? currentSuperColumn : columnFamily;
+        container.addColumn(column);
+    }
+
+    /**
+     * Insert a new "regular" column to the current row (and super column if applicable).
+     * @param name the column name
+     * @param value the column value
+     * @param timestamp the column timestamp
+     */
+    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    {
+        addColumn(new Column(name, value, timestamp));
+    }
+
+    /**
+     * Insert a new expiring column to the current row (and super column if applicable).
+     * @param name the column name
+     * @param value the column value
+     * @param timestamp the column timestamp
+     * @param ttl the column time to live in seconds
+     * @param expirationTimestamp the local expiration timestamp in milliseconds. This is the server time timestamp used for actually
+     * expiring the column, and as a consequence should be synchronized with the cassandra servers time. If {@code timestamp} represents
+     * the insertion time in microseconds (which is not required), this should be {@code (timestamp / 1000) + (ttl * 1000)}.
+     */
+    public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS)
+    {
+        addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
+    }
+
+    /**
+     * Insert a new counter column to the current row (and super column if applicable).
+     * @param name the column name
+     * @param value the value of the counter
+     */
+    public void addCounterColumn(ByteBuffer name, long value)
+    {
+        addColumn(new CounterColumn(name, CounterContext.instance().create(nodeid, 1L, value, false), System.currentTimeMillis()));
+    }
+
+    /**
+     * Close this writer.
+     * This method should be called, otherwise the produced sstables are not
+     * guaranteed to be complete (and won't be in practice).
+     */
+    public abstract void close() throws IOException;
+
+    protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException;
+}

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java?rev=1148267&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java Tue Jul 19 10:44:48 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * A SSTable writer that doesn't assume rows are in sorted order.
+ * This writer buffers rows in memory and then write them all in sorted order.
+ * To avoid loading the entire data set in memory, the amount of rows buffered
+ * is configurable. Each time the threshold is met, one SSTable will be
+ * created (and the buffer be reseted).
+ *
+ * @see AbstractSSTableSimpleWriter
+ */
+public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
+{
+    private final Map<DecoratedKey, ColumnFamily> keys = new TreeMap<DecoratedKey, ColumnFamily>();
+    private final long bufferSize;
+    private long currentSize;
+
+    /**
+     * Create a new buffering writer.
+     * @param directory the directory where to write the sstables
+     * @param keyspace the keyspace name
+     * @param columnFamily the column family name
+     * @param comparator the column family comparator
+     * @param subComparator the column family subComparator or null if not a Super column family.
+     * @param bufferSizeInMB the data size in MB before which a sstable is written and the buffer reseted. This correspond roughly to the written
+     * data size (i.e. the size of the create sstable). The actual size used in memory will be higher (by how much depends on the size of the
+     * columns you add). For 1GB of heap, a 128 bufferSizeInMB is probably a reasonable choice. If you experience OOM, this value should be lowered.
+     */
+    public SSTableSimpleUnsortedWriter(File directory,
+                                       String keyspace,
+                                       String columnFamily,
+                                       AbstractType comparator,
+                                       AbstractType subComparator,
+                                       int bufferSizeInMB) throws IOException
+    {
+        super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator));
+        this.bufferSize = bufferSizeInMB * 1024 * 1024;
+    }
+
+    protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+    {
+        ColumnFamily previous = keys.put(key, columnFamily);
+        currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;
+
+        // Note that if the row was existing already, our size estimation will be slightly off
+        // since we'll be counting the key multiple times.
+        if (previous != null)
+            columnFamily.addAll(previous);
+
+        if (currentSize > bufferSize)
+            sync();
+    }
+
+    public void close() throws IOException
+    {
+        sync();
+    }
+
+    private void sync() throws IOException
+    {
+        if (keys.isEmpty())
+            return;
+
+        SSTableWriter writer = getWriter();
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : keys.entrySet())
+        {
+            writer.append(entry.getKey(), entry.getValue());
+        }
+        writer.closeAndOpenReader();
+        currentSize = 0;
+        keys.clear();
+    }
+}

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java?rev=1148267&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java Tue Jul 19 10:44:48 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+/**
+ * A SSTable writer that assumes rows are in (partitioner) sorted order.
+ * Contrarily to SSTableSimpleUnsortedWriter, this writer does not buffer
+ * anything into memory, however it assumes that row are added in sorted order
+ * (an exception will be thrown otherwise), which for the RandomPartitioner
+ * means that rows should be added by increasing md5 of the row key. This is
+ * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
+ * prefered.
+ *
+ * @see AbstractSSTableSimpleWriter
+ */
+public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
+{
+    private final SSTableWriter writer;
+
+    /**
+     * Create a new writer.
+     * @param directory the directory where to write the sstable
+     * @param keyspace the keyspace name
+     * @param columnFamily the column family name
+     * @param comparator the column family comparator
+     * @param subComparator the column family subComparator or null if not a Super column family.
+     */
+    public SSTableSimpleWriter(File directory,
+                               String keyspace,
+                               String columnFamily,
+                               AbstractType comparator,
+                               AbstractType subComparator) throws IOException
+    {
+        this(directory,
+             new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator));
+    }
+
+    public SSTableSimpleWriter(File directory, CFMetaData metadata) throws IOException
+    {
+        super(directory, metadata);
+        writer = getWriter();
+    }
+
+    public void close() throws IOException
+    {
+        if (currentKey != null)
+            writeRow(currentKey, columnFamily);
+        writer.closeAndOpenReader();
+    }
+
+    protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+    {
+        writer.append(key, columnFamily);
+    }
+}