You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/19 12:44:49 UTC
svn commit: r1148267 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/io/sstable/
Author: slebresne
Date: Tue Jul 19 10:44:48 2011
New Revision: 1148267
URL: http://svn.apache.org/viewvc?rev=1148267&view=rev
Log:
Add simplified interfaces to write sstables (for bulk loading)
patch by slebresne; reviewed by jbellis for CASSANDRA-2911
Added:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1148267&r1=1148266&r2=1148267&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jul 19 10:44:48 2011
@@ -30,6 +30,8 @@
* avoid including inferred types in CF update (CASSANDRA-2809)
* fix JMX bulkload call (CASSANDRA-2908)
* fix updating KS with durable_writes=false (CASSANDRA-2907)
+ * add simplified facade to SSTableWriter for bulk loading use
+ (CASSANDRA-2911)
0.8.1
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java?rev=1148267&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java Tue Jul 19 10:44:48 2011
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.NodeId;
+import org.apache.cassandra.utils.Pair;
+
+public abstract class AbstractSSTableSimpleWriter
+{
+ protected final File directory;
+ protected final CFMetaData metadata;
+ protected DecoratedKey currentKey;
+ protected ColumnFamily columnFamily;
+ protected SuperColumn currentSuperColumn;
+ protected final NodeId nodeid = NodeId.generate();
+
+ public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata)
+ {
+ this.metadata = metadata;
+ this.directory = directory;
+ }
+
+ protected SSTableWriter getWriter() throws IOException
+ {
+ return new SSTableWriter(
+ makeFilename(directory, metadata.ksName, metadata.cfName),
+ 0, // We don't care about the bloom filter
+ metadata,
+ StorageService.getPartitioner(),
+ ReplayPosition.NONE);
+ }
+
+ // find available generation and pick up filename from that
+ private static String makeFilename(File directory, final String keyspace, final String columnFamily)
+ {
+ final Set<Descriptor> existing = new HashSet<Descriptor>();
+ directory.list(new FilenameFilter()
+ {
+ public boolean accept(File dir, String name)
+ {
+ Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
+ Descriptor desc = p == null ? null : p.left;
+ if (desc == null)
+ return false;
+
+ if (desc.cfname.equals(columnFamily))
+ existing.add(desc);
+
+ return false;
+ }
+ });
+ int maxGen = 0;
+ for (Descriptor desc : existing)
+ maxGen = Math.max(maxGen, desc.generation);
+ return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, false).filenameFor(Component.DATA);
+ }
+
+ /**
+ * Start a new row whose key is {@code key}.
+ * @param key the row key
+ */
+ public void newRow(ByteBuffer key) throws IOException
+ {
+ if (currentKey != null && !columnFamily.isEmpty())
+ writeRow(currentKey, columnFamily);
+
+ currentKey = StorageService.getPartitioner().decorateKey(key);
+ columnFamily = ColumnFamily.create(metadata);
+ }
+
+ /**
+ * Start a new super column with name {@code name}.
+ * @param name the name for the super column
+ */
+ public void newSuperColumn(ByteBuffer name)
+ {
+ if (!columnFamily.isSuper())
+ throw new IllegalStateException("Cannot add a super column to a standard column family");
+
+ currentSuperColumn = new SuperColumn(name, metadata.subcolumnComparator);
+ columnFamily.addColumn(currentSuperColumn);
+ }
+
+ private void addColumn(IColumn column)
+ {
+ if (columnFamily.isSuper() && currentSuperColumn == null)
+ throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
+
+ IColumnContainer container = columnFamily.isSuper() ? currentSuperColumn : columnFamily;
+ container.addColumn(column);
+ }
+
+ /**
+ * Insert a new "regular" column to the current row (and super column if applicable).
+ * @param name the column name
+ * @param value the column value
+ * @param timestamp the column timestamp
+ */
+ public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+ {
+ addColumn(new Column(name, value, timestamp));
+ }
+
+ /**
+ * Insert a new expiring column to the current row (and super column if applicable).
+ * @param name the column name
+ * @param value the column value
+ * @param timestamp the column timestamp
+ * @param ttl the column time to live in seconds
+ * @param expirationTimestamp the local expiration timestamp in milliseconds. This is the server time timestamp used for actually
+ * expiring the column, and as a consequence should be synchronized with the cassandra servers time. If {@code timestamp} represents
+ * the insertion time in microseconds (which is not required), this should be {@code (timestamp / 1000) + (ttl * 1000)}.
+ */
+ public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS)
+ {
+ addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
+ }
+
+ /**
+ * Insert a new counter column to the current row (and super column if applicable).
+ * @param name the column name
+ * @param value the value of the counter
+ */
+ public void addCounterColumn(ByteBuffer name, long value)
+ {
+ addColumn(new CounterColumn(name, CounterContext.instance().create(nodeid, 1L, value, false), System.currentTimeMillis()));
+ }
+
+ /**
+ * Close this writer.
+ * This method should be called, otherwise the produced sstables are not
+ * guaranteed to be complete (and won't be in practice).
+ */
+ public abstract void close() throws IOException;
+
+ protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException;
+}
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java?rev=1148267&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java Tue Jul 19 10:44:48 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * A SSTable writer that doesn't assume rows are in sorted order.
+ * This writer buffers rows in memory and then write them all in sorted order.
+ * To avoid loading the entire data set in memory, the amount of rows buffered
+ * is configurable. Each time the threshold is met, one SSTable will be
+ * created (and the buffer be reseted).
+ *
+ * @see AbstractSSTableSimpleWriter
+ */
+public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
+{
+ private final Map<DecoratedKey, ColumnFamily> keys = new TreeMap<DecoratedKey, ColumnFamily>();
+ private final long bufferSize;
+ private long currentSize;
+
+ /**
+ * Create a new buffering writer.
+ * @param directory the directory where to write the sstables
+ * @param keyspace the keyspace name
+ * @param columnFamily the column family name
+ * @param comparator the column family comparator
+ * @param subComparator the column family subComparator or null if not a Super column family.
+ * @param bufferSizeInMB the data size in MB before which a sstable is written and the buffer reseted. This correspond roughly to the written
+ * data size (i.e. the size of the create sstable). The actual size used in memory will be higher (by how much depends on the size of the
+ * columns you add). For 1GB of heap, a 128 bufferSizeInMB is probably a reasonable choice. If you experience OOM, this value should be lowered.
+ */
+ public SSTableSimpleUnsortedWriter(File directory,
+ String keyspace,
+ String columnFamily,
+ AbstractType comparator,
+ AbstractType subComparator,
+ int bufferSizeInMB) throws IOException
+ {
+ super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator));
+ this.bufferSize = bufferSizeInMB * 1024 * 1024;
+ }
+
+ protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+ {
+ ColumnFamily previous = keys.put(key, columnFamily);
+ currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;
+
+ // Note that if the row was existing already, our size estimation will be slightly off
+ // since we'll be counting the key multiple times.
+ if (previous != null)
+ columnFamily.addAll(previous);
+
+ if (currentSize > bufferSize)
+ sync();
+ }
+
+ public void close() throws IOException
+ {
+ sync();
+ }
+
+ private void sync() throws IOException
+ {
+ if (keys.isEmpty())
+ return;
+
+ SSTableWriter writer = getWriter();
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry : keys.entrySet())
+ {
+ writer.append(entry.getKey(), entry.getValue());
+ }
+ writer.closeAndOpenReader();
+ currentSize = 0;
+ keys.clear();
+ }
+}
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java?rev=1148267&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java Tue Jul 19 10:44:48 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+/**
+ * A SSTable writer that assumes rows are in (partitioner) sorted order.
+ * Contrarily to SSTableSimpleUnsortedWriter, this writer does not buffer
+ * anything into memory, however it assumes that row are added in sorted order
+ * (an exception will be thrown otherwise), which for the RandomPartitioner
+ * means that rows should be added by increasing md5 of the row key. This is
+ * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
+ * prefered.
+ *
+ * @see AbstractSSTableSimpleWriter
+ */
+public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
+{
+ private final SSTableWriter writer;
+
+ /**
+ * Create a new writer.
+ * @param directory the directory where to write the sstable
+ * @param keyspace the keyspace name
+ * @param columnFamily the column family name
+ * @param comparator the column family comparator
+ * @param subComparator the column family subComparator or null if not a Super column family.
+ */
+ public SSTableSimpleWriter(File directory,
+ String keyspace,
+ String columnFamily,
+ AbstractType comparator,
+ AbstractType subComparator) throws IOException
+ {
+ this(directory,
+ new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator));
+ }
+
+ public SSTableSimpleWriter(File directory, CFMetaData metadata) throws IOException
+ {
+ super(directory, metadata);
+ writer = getWriter();
+ }
+
+ public void close() throws IOException
+ {
+ if (currentKey != null)
+ writeRow(currentKey, columnFamily);
+ writer.closeAndOpenReader();
+ }
+
+ protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+ {
+ writer.append(key, columnFamily);
+ }
+}