You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:04:22 UTC
svn commit: r749202 [5/6] - in /incubator/cassandra/src: ./ org/ org/apache/
org/apache/cassandra/ org/apache/cassandra/db/
Added: incubator/cassandra/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/RowMutation.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/RowMutation.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/RowMutation.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutation implements Serializable
+{
+ private static ICompactSerializer<RowMutation> serializer_;
+
+ static
+ {
+ serializer_ = new RowMutationSerializer();
+ }
+
+ static ICompactSerializer<RowMutation> serializer()
+ {
+ return serializer_;
+ }
+
+ private String table_;
+ private String key_;
+ protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
+ protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
+
+ /* Ctor for JAXB */
+ private RowMutation()
+ {
+ }
+
+ public RowMutation(String table, String key)
+ {
+ table_ = table;
+ key_ = key;
+ }
+
+ public RowMutation(String table, Row row)
+ {
+ table_ = table;
+ key_ = row.key();
+ Map<String, ColumnFamily> cfSet = row.getColumnFamilies();
+ Set<String> keyset = cfSet.keySet();
+ for(String cfName : keyset)
+ {
+ add(cfName, cfSet.get(cfName));
+ }
+ }
+
+ protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications, Map<String, ColumnFamily> deletions)
+ {
+ table_ = table;
+ key_ = key;
+ modifications_ = modifications;
+ deletions_ = deletions;
+ }
+
+ public static String[] getColumnAndColumnFamily(String cf)
+ {
+ return cf.split(":");
+ }
+
+ String table()
+ {
+ return table_;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
+ {
+ String cfName = Table.hints_ + ":" + hint;
+ add(cfName, new byte[0]);
+ }
+
+ /*
+ * Specify a column family name and the corresponding column
+ * family object.
+ * param @ cf - column family name
+ * param @ columnFamily - the column family.
+ */
+ public void add(String cf, ColumnFamily columnFamily)
+ {
+ modifications_.put(cf, columnFamily);
+ }
+
+ /*
+ * Specify a column name and a corresponding value for
+ * the column. Column name is specified as <column family>:column.
+ * This will result in a ColumnFamily associated with
+ * <column family> as name and a Column with <column>
+ * as name.
+ *
+ * param @ cf - column name as <column family>:<column>
+ * param @ value - value associated with the column
+ */
+ public void add(String cf, byte[] value) throws IOException, ColumnFamilyNotDefinedException
+ {
+ add(cf, value, 0);
+ }
+
+ /*
+ * Specify a column name and a corresponding value for
+ * the column. Column name is specified as <column family>:column.
+ * This will result in a ColumnFamily associated with
+ * <column family> as name and a Column with <column>
+ * as name. The columan can be further broken up
+ * as super column name : columnname in case of super columns
+ *
+ * param @ cf - column name as <column family>:<column>
+ * param @ value - value associated with the column
+ * param @ timestamp - ts associated with this data.
+ */
+ public void add(String cf, byte[] value, long timestamp)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+
+ if ( values.length == 0 || values.length == 1 || values.length > 3 )
+ throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
+
+ ColumnFamily columnFamily = modifications_.get(values[0]);
+ if( values.length == 2 )
+ {
+ if ( columnFamily == null )
+ {
+ columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Standard"));
+ }
+ columnFamily.createColumn(values[1], value, timestamp);
+ }
+ if( values.length == 3 )
+ {
+ if ( columnFamily == null )
+ {
+ columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Super"));
+ }
+ columnFamily.createColumn(values[1]+ ":" + values[2], value, timestamp);
+ }
+ modifications_.put(values[0], columnFamily);
+ }
+
+ /*
+ * Specify a column name to be deleted. Column name is
+ * specified as <column family>:column. This will result
+ * in a ColumnFamily associated with <column family> as
+ * name and perhaps Column with <column> as name being
+ * marked as deleted.
+ * TODO : Delete is NOT correct as we do not know
+ * the CF type so we need to fix that.
+ * param @ cf - column name as <column family>:<column>
+ */
+ public void delete(String cf)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+
+ if ( values.length == 0 || values.length > 3 )
+ throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
+
+ ColumnFamily columnFamily = modifications_.get(values[0]);
+ if ( columnFamily == null )
+ columnFamily = new ColumnFamily(values[0]);
+ if(values.length == 2 )
+ {
+ columnFamily.createColumn( values[1]);
+ }
+ if(values.length == 3 )
+ {
+ columnFamily.createColumn( values[1] + ":" + values[2]);
+ }
+ deletions_.put(values[0], columnFamily);
+ }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the table that is obtained by calling Table.open().
+ */
+ public void apply() throws IOException, ColumnFamilyNotDefinedException
+ {
+ Row row = new Row(key_);
+ Table table = Table.open(table_);
+ Set<String> cfNames = modifications_.keySet();
+ for (String cfName : cfNames )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( modifications_.get(cfName) );
+ }
+ table.apply(row);
+
+ Set<String> cfNames2 = deletions_.keySet();
+ for (String cfName : cfNames2 )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( deletions_.get(cfName) );
+ }
+ if ( deletions_.size() > 0 )
+ table.delete(row);
+ }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the table that is obtained by calling Table.open().
+ */
+ void apply(Row row) throws IOException, ColumnFamilyNotDefinedException
+ {
+ Table table = Table.open(table_);
+ Set<String> cfNames = modifications_.keySet();
+ for (String cfName : cfNames )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( modifications_.get(cfName) );
+ }
+ table.apply(row);
+
+ Set<String> cfNames2 = deletions_.keySet();
+ for (String cfName : cfNames2 )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( deletions_.get(cfName) );
+ }
+ if ( deletions_.size() > 0 )
+ table.delete(row);
+ }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the table that is obtained by calling Table.open().
+ */
+ void load(Row row) throws IOException, ColumnFamilyNotDefinedException
+ {
+ Table table = Table.open(table_);
+ Set<String> cfNames = modifications_.keySet();
+ for (String cfName : cfNames )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( modifications_.get(cfName) );
+ }
+ table.load(row);
+ }
+}
+
+class RowMutationSerializer implements ICompactSerializer<RowMutation>
+{
+ private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+ {
+ int size = map.size();
+ dos.writeInt(size);
+ if ( size > 0 )
+ {
+ Set<String> keys = map.keySet();
+ for( String key : keys )
+ {
+ dos.writeUTF(key);
+ ColumnFamily cf = map.get(key);
+ if ( cf != null )
+ {
+ ColumnFamily.serializer().serialize(cf, dos);
+ }
+ }
+ }
+ }
+
+ public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(rm.table());
+ dos.writeUTF(rm.key());
+
+ /* serialize the modifications_ in the mutation */
+ freezeTheMaps(rm.modifications_, dos);
+
+ /* serialize the deletions_ in the mutation */
+ freezeTheMaps(rm.deletions_, dos);
+ }
+
+ private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+ {
+ Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+ int size = dis.readInt();
+ for ( int i = 0; i < size; ++i )
+ {
+ String key = dis.readUTF();
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+ map.put(key, cf);
+ }
+ return map;
+ }
+
+ public RowMutation deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+
+ /* Defreeze the modifications_ map */
+ Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+
+ /* Defreeze the deletions_ map */
+ Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
+
+ return new RowMutation(table, key, modifications, deletions);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/RowMutationMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/RowMutationMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/RowMutationMessage.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationMessage implements Serializable
+{
+ public static final String hint_ = "HINT";
+ private static ICompactSerializer<RowMutationMessage> serializer_;
+
+ static
+ {
+ serializer_ = new RowMutationMessageSerializer();
+ }
+
+ static ICompactSerializer<RowMutationMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage) throws IOException
+ {
+ return makeRowMutationMessage(rowMutationMessage, StorageService.mutationVerbHandler_);
+ }
+
+ public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage, String verbHandlerName) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ RowMutationMessage.serializer().serialize(rowMutationMessage, dos);
+ EndPoint local = StorageService.getLocalStorageEndPoint();
+ EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostName(), 7000);
+ Message message = new Message(from, StorageService.mutationStage_, verbHandlerName, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name="RowMutation")
+ private RowMutation rowMutation_;
+
+ private RowMutationMessage()
+ {}
+
+ public RowMutationMessage(RowMutation rowMutation)
+ {
+ rowMutation_ = rowMutation;
+ }
+
+ public RowMutation getRowMutation()
+ {
+ return rowMutation_;
+ }
+}
+
+class RowMutationMessageSerializer implements ICompactSerializer<RowMutationMessage>
+{
+ public void serialize(RowMutationMessage rm, DataOutputStream dos) throws IOException
+ {
+ RowMutation.serializer().serialize(rm.getRowMutation(), dos);
+ }
+
+ public RowMutationMessage deserialize(DataInputStream dis) throws IOException
+ {
+ RowMutation rm = RowMutation.serializer().deserialize(dis);
+ return new RowMutationMessage(rm);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/RowMutationVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/RowMutationVerbHandler.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationVerbHandler implements IVerbHandler
+{
+ protected static class RowMutationContext
+ {
+ protected Row row_ = new Row();
+ protected DataInputBuffer buffer_ = new DataInputBuffer();
+ }
+
+ private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+
+ public void doVerb(Message message)
+ {
+ /* For DEBUG only. Printing queue length */
+ logger_.info( "ROW MUTATION STAGE: " + StageManager.getStageTaskCount(StorageService.mutationStage_) );
+ /* END DEBUG */
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ /* Obtain a Row Mutation Context from TLS */
+ RowMutationContext rowMutationCtx = tls_.get();
+ if ( rowMutationCtx == null )
+ {
+ rowMutationCtx = new RowMutationContext();
+ tls_.set(rowMutationCtx);
+ }
+
+ rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
+ try
+ {
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+ RowMutation rm = rmMsg.getRowMutation();
+ /* Check if there were any hints in this message */
+ byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
+ if ( hintedBytes != null && hintedBytes.length > 0 )
+ {
+ EndPoint hint = EndPoint.fromBytes(hintedBytes);
+ /* add necessary hints to this mutation */
+ try
+ {
+ RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+ hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+ hintedMutation.apply();
+ }
+ catch ( ColumnFamilyNotDefinedException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ }
+
+ long start = System.currentTimeMillis();
+
+ rowMutationCtx.row_.key(rm.key());
+ rm.apply(rowMutationCtx.row_);
+
+ long end = System.currentTimeMillis();
+ logger_.info("ROW MUTATION APPLY: " + (end - start) + " ms.");
+
+ /*WriteResponseMessage writeResponseMessage = new WriteResponseMessage(rm.table(), rm.key(), true);
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{writeResponseMessage} );
+ logger_.debug("Sending teh response to " + message.getFrom() + " for key :" + rm.key());
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom()); */
+ }
+ catch( ColumnFamilyNotDefinedException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( IOException e )
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/Scanner.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/Scanner.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/Scanner.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+
+/**
+ * This class is used to loop through a retrieved column family
+ * to get all columns in Iterator style. Usage is as follows:
+ * Scanner scanner = new Scanner("table");
+ * scanner.fetchColumnfamily(key, "column-family");
+ *
+ * while ( scanner.hasNext() )
+ * {
+ * Column column = scanner.next();
+ * // Do something with the column
+ * }
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Scanner implements IScanner<IColumn>
+{
+ /* Table over which we are scanning. */
+ private String table_;
+ /* Iterator when iterating over the columns of a given key in a column family */
+ private Iterator<IColumn> columnIt_;
+
+ public Scanner(String table)
+ {
+ table_ = table;
+ }
+
+ /**
+ * Fetch the columns associated with this key for the specified column family.
+ * This method basically sets up an iterator internally and then provides an
+ * iterator like interface to iterate over the columns.
+ * @param key key we are interested in.
+ * @param cf column family we are interested in.
+ * @throws IOException
+ * @throws ColumnFamilyNotDefinedException
+ */
+ public void fetch(String key, String cf) throws IOException, ColumnFamilyNotDefinedException
+ {
+ if ( cf != null )
+ {
+ Table table = Table.open(table_);
+ ColumnFamily columnFamily = table.get(key, cf);
+ if ( columnFamily != null )
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ columnIt_ = columns.iterator();
+ }
+ }
+ }
+
+ public boolean hasNext() throws IOException
+ {
+ return columnIt_.hasNext();
+ }
+
+ public IColumn next()
+ {
+ return columnIt_.next();
+ }
+
+ public void close() throws IOException
+ {
+ throw new UnsupportedOperationException("This operation is not supported in the Scanner");
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/SuperColumn.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/SuperColumn.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/SuperColumn.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class SuperColumn implements IColumn, Serializable
+{
+ private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+ private static ICompactSerializer2<IColumn> serializer_;
+ private final static String seperator_ = ":";
+
+ static
+ {
+ serializer_ = new SuperColumnSerializer();
+ }
+
+ static ICompactSerializer2<IColumn> serializer()
+ {
+ return serializer_;
+ }
+
+ private String name_;
+ private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
+ private AtomicBoolean isMarkedForDelete_ = new AtomicBoolean(false);
+ private AtomicInteger size_ = new AtomicInteger(0);
+
+ SuperColumn()
+ {
+ }
+
+ SuperColumn(String name)
+ {
+ name_ = name;
+ }
+
+ public boolean isMarkedForDelete()
+ {
+ return isMarkedForDelete_.get();
+ }
+
+ public String name()
+ {
+ return name_;
+ }
+
+ public Collection<IColumn> getSubColumns()
+ {
+ return columns_.getSortedColumns();
+ }
+
+ public IColumn getSubColumn( String columnName )
+ {
+ IColumn column = columns_.get(columnName);
+ if ( column instanceof SuperColumn )
+ throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+ return column;
+ }
+
+ public int compareTo(IColumn superColumn)
+ {
+ return (name_.compareTo(superColumn.name()));
+ }
+
+
+ public int size()
+ {
+ /*
+ * return the size of the individual columns
+ * that make up the super column. This is an
+ * APPROXIMATION of the size used only from the
+ * Memtable.
+ */
+ return size_.get();
+ }
+
+ /**
+ * This returns the size of the super-column when serialized.
+ * @see org.apache.cassandra.db.IColumn#serializedSize()
+ */
+ public int serializedSize()
+ {
+ /*
+ * Size of a super-column is =
+ * size of a name (UtfPrefix + length of the string)
+ * + 1 byte to indicate if the super-column has been deleted
+ * + 4 bytes for size of the sub-columns
+ * + 4 bytes for the number of sub-columns
+ * + size of all the sub-columns.
+ */
+
+ /*
+ * We store the string as UTF-8 encoded, so when we calculate the length, it
+ * should be converted to UTF-8.
+ */
+ /*
+ * We need to keep the way we are calculating the column size in sync with the
+ * way we are calculating the size for the column family serializer.
+ */
+ return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.intSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
+ }
+
+ /**
+ * This calculates the exact size of the sub columns on the fly
+ */
+ int getSizeOfAllColumns()
+ {
+ int size = 0;
+ Collection<IColumn> subColumns = getSubColumns();
+ for ( IColumn subColumn : subColumns )
+ {
+ size += subColumn.serializedSize();
+ }
+ return size;
+ }
+
+ protected void remove(String columnName)
+ {
+ columns_.remove(columnName);
+ }
+
+ public long timestamp()
+ {
+ throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
+ }
+
+ public long timestamp(String key)
+ {
+ IColumn column = columns_.get(key);
+ if ( column instanceof SuperColumn )
+ throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+ if ( column != null )
+ return column.timestamp();
+ throw new IllegalArgumentException("Timestamp was requested for a column that does not exist.");
+ }
+
+ public byte[] value()
+ {
+ throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
+ }
+
+ public byte[] value(String key)
+ {
+ IColumn column = columns_.get(key);
+ if ( column instanceof SuperColumn )
+ throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+ if ( column != null )
+ return column.value();
+ throw new IllegalArgumentException("Value was requested for a column that does not exist.");
+ }
+
+ public void addColumn(String name, IColumn column)
+ {
+ if ( column instanceof SuperColumn )
+ throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+ IColumn oldColumn = columns_.get(name);
+ if ( oldColumn == null )
+ {
+ columns_.put(name, column);
+ size_.addAndGet(column.size());
+ }
+ else
+ {
+ if ( oldColumn.timestamp() <= column.timestamp() )
+ {
+ columns_.put(name, column);
+ int delta = (-1)*oldColumn.size();
+ /* subtract the size of the oldColumn */
+ size_.addAndGet(delta);
+ /* add the size of the new column */
+ size_.addAndGet(column.size());
+ }
+ }
+ }
+
+ /*
+ * Go through each sub column if it exists then as it to resolve itself
+ * if the column does not exist then create it.
+ */
+ public boolean putColumn(IColumn column)
+ {
+ if ( !(column instanceof SuperColumn))
+ throw new UnsupportedOperationException("Only Super column objects should be put here");
+ if( !name_.equals(column.name()))
+ throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ Collection<IColumn> columns = column.getSubColumns();
+
+ for ( IColumn subColumn : columns )
+ {
+ addColumn(subColumn.name(), subColumn);
+ }
+ return false;
+ }
+
+ public int getObjectCount()
+ {
+ return 1 + columns_.size();
+ }
+
+ public void delete()
+ {
+ columns_.clear();
+ isMarkedForDelete_.set(true);
+ }
+
+ int getColumnCount()
+ {
+ return columns_.size();
+ }
+
+ public void repair(IColumn column)
+ {
+ Collection<IColumn> columns = column.getSubColumns();
+
+ for ( IColumn subColumn : columns )
+ {
+ IColumn columnInternal = columns_.get(subColumn.name());
+ if( columnInternal == null )
+ columns_.put(subColumn.name(), subColumn);
+ else
+ columnInternal.repair(subColumn);
+ }
+ }
+
+
+ public IColumn diff(IColumn column)
+ {
+ IColumn columnDiff = new SuperColumn(column.name());
+ Collection<IColumn> columns = column.getSubColumns();
+
+ for ( IColumn subColumn : columns )
+ {
+ IColumn columnInternal = columns_.get(subColumn.name());
+ if(columnInternal == null )
+ {
+ columnDiff.addColumn(subColumn.name(), subColumn);
+ }
+ else
+ {
+ IColumn subColumnDiff = columnInternal.diff(subColumn);
+ if(subColumnDiff != null)
+ {
+ columnDiff.addColumn(subColumn.name(), subColumnDiff);
+ }
+ }
+ }
+ if(columnDiff.getSubColumns().size() != 0)
+ return columnDiff;
+ else
+ return null;
+ }
+
+ public byte[] digest()
+ {
+ Set<IColumn> columns = columns_.getSortedColumns();
+ byte[] xorHash = new byte[0];
+ if(name_ == null)
+ return xorHash;
+ xorHash = name_.getBytes();
+ for(IColumn column : columns)
+ {
+ xorHash = FBUtilities.xor(xorHash, column.digest());
+ }
+ return xorHash;
+ }
+
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+
+ Collection<IColumn> columns = getSubColumns();
+ sb.append(columns.size());
+ sb.append(":");
+ sb.append(size());
+ sb.append(":");
+ for ( IColumn subColumn : columns )
+ {
+ sb.append(subColumn.toString());
+ }
+ sb.append(":");
+ return sb.toString();
+ }
+}
+
+class SuperColumnSerializer implements ICompactSerializer2<IColumn>
+{
+ public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ {
+ SuperColumn superColumn = (SuperColumn)column;
+ dos.writeUTF(superColumn.name());
+ dos.writeBoolean(superColumn.isMarkedForDelete());
+
+ Collection<IColumn> columns = column.getSubColumns();
+ int size = columns.size();
+ dos.writeInt(size);
+
+ /*
+ * Add the total size of the columns. This is useful
+ * to skip over all the columns in this super column
+ * if we are not interested in this super column.
+ */
+ dos.writeInt(superColumn.getSizeOfAllColumns());
+ // dos.writeInt(superColumn.size());
+
+ for ( IColumn subColumn : columns )
+ {
+ Column.serializer().serialize(subColumn, dos);
+ }
+ }
+
+ /*
+ * Use this method to create a bare bones Super Column. This super column
+ * does not have any of the Column information.
+ */
+ private SuperColumn defreezeSuperColumn(DataInputStream dis) throws IOException
+ {
+ String name = dis.readUTF();
+ boolean delete = dis.readBoolean();
+ SuperColumn superColumn = new SuperColumn(name);
+ if ( delete )
+ superColumn.delete();
+ return superColumn;
+ }
+
+ public IColumn deserialize(DataInputStream dis) throws IOException
+ {
+ SuperColumn superColumn = defreezeSuperColumn(dis);
+ if ( !superColumn.isMarkedForDelete() )
+ fillSuperColumn(superColumn, dis);
+ return superColumn;
+ }
+
+ public void skip(DataInputStream dis) throws IOException
+ {
+ defreezeSuperColumn(dis);
+ /* read the number of columns stored */
+ dis.readInt();
+ /* read the size of all columns to skip */
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+
+ private void fillSuperColumn(IColumn superColumn, DataInputStream dis) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return;
+
+ /* read the number of columns */
+ int size = dis.readInt();
+ /* read the size of all columns */
+ dis.readInt();
+ for ( int i = 0; i < size; ++i )
+ {
+ IColumn subColumn = Column.serializer().deserialize(dis);
+ superColumn.addColumn(subColumn.name(), subColumn);
+ }
+ }
+
+ public IColumn deserialize(DataInputStream dis, IFilter filter) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return null;
+
+ IColumn superColumn = defreezeSuperColumn(dis);
+ superColumn = filter.filter(superColumn, dis);
+ if(superColumn != null)
+ {
+ if ( !superColumn.isMarkedForDelete() )
+ fillSuperColumn(superColumn, dis);
+ return superColumn;
+ }
+ else
+ {
+ /* read the number of columns stored */
+ dis.readInt();
+ /* read the size of all columns to skip */
+ int size = dis.readInt();
+ dis.skip(size);
+ return null;
+ }
+ }
+
+ /*
+ * Deserialize a particular column since the name is in the form of
+ * superColumn:column.
+ */
+ public IColumn deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return null;
+
+ String[] names = RowMutation.getColumnAndColumnFamily(name);
+ if ( names.length == 1 )
+ {
+ IColumn superColumn = defreezeSuperColumn(dis);
+ if(name.equals(superColumn.name()))
+ {
+ if ( !superColumn.isMarkedForDelete() )
+ {
+ /* read the number of columns stored */
+ int size = dis.readInt();
+ /* read the size of all columns */
+ dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
+ {
+ column = Column.serializer().deserialize(dis, filter);
+ if(column != null)
+ {
+ superColumn.addColumn(column.name(), column);
+ column = null;
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
+ }
+ }
+ return superColumn;
+ }
+ else
+ {
+ /* read the number of columns stored */
+ dis.readInt();
+ /* read the size of all columns to skip */
+ int size = dis.readInt();
+ dis.skip(size);
+ return null;
+ }
+ }
+
+ SuperColumn superColumn = defreezeSuperColumn(dis);
+ if ( !superColumn.isMarkedForDelete() )
+ {
+ int size = dis.readInt();
+ /* skip the size of the columns */
+ dis.readInt();
+ if ( size > 0 )
+ {
+ for ( int i = 0; i < size; ++i )
+ {
+ IColumn subColumn = Column.serializer().deserialize(dis, names[1], filter);
+ if ( subColumn != null )
+ {
+ superColumn.addColumn(subColumn.name(), subColumn);
+ break;
+ }
+ }
+ }
+ }
+ return superColumn;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/SystemTable.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/SystemTable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/SystemTable.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SystemTable
+{
+ private static Logger logger_ = Logger.getLogger(SystemTable.class);
+ private static Map<String, SystemTable> instances_ = new HashMap<String, SystemTable>();
+
+ /* Name of the SystemTable */
+ public static final String name_ = "System";
+ /* Name of the only column family in the Table */
+ static final String cfName_ = "LocationInfo";
+ /* Name of columns in this table */
+ static final String generation_ = "Generation";
+ static final String token_ = "Token";
+
+ /* The ID associated with this column family */
+ static final int cfId_ = -1;
+
+ /* Table name. */
+ private String table_;
+ /* after the header position */
+ private long startPosition_ = 0L;
+ /* Cache the SystemRow that we read. */
+ private Row systemRow_;
+
+ /* Use the following writer/reader to write/read to System table */
+ private IFileWriter writer_;
+ private IFileReader reader_;
+
+ public static SystemTable openSystemTable(String tableName) throws IOException
+ {
+ SystemTable table = instances_.get("System");
+ if ( table == null )
+ {
+ table = new SystemTable(tableName);
+ instances_.put(tableName, table);
+ }
+ return table;
+ }
+
+ SystemTable(String table) throws IOException
+ {
+ table_ = table;
+ String systemTable = getFileName();
+ writer_ = SequenceFile.writer(systemTable);
+ reader_ = SequenceFile.reader(systemTable);
+ }
+
+ private String getFileName()
+ {
+ return DatabaseDescriptor.getMetadataDirectory() + System.getProperty("file.separator") + table_ + ".db";
+ }
+
+ /*
+ * Selects the row associated with the given key.
+ */
+ public Row get(String key) throws IOException
+ {
+ String file = getFileName();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ reader_.next(bufOut);
+
+ if ( bufOut.getLength() > 0 )
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /*
+ * This buffer contains key and value so we need to strip
+ * certain parts
+ */
+ // read the key
+ bufIn.readUTF();
+ // read the data length and then deserialize
+ bufIn.readInt();
+ try
+ {
+ systemRow_ = Row.serializer().deserialize(bufIn);
+ }
+ catch ( IOException e )
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ }
+ return systemRow_;
+ }
+
+ /*
+ * This is a one time thing and hence we do not need
+ * any commit log related activity. Just write in an
+ * atomic fashion to the underlying SequenceFile.
+ */
+ void apply(Row row) throws IOException
+ {
+ systemRow_ = row;
+ String file = getFileName();
+ long currentPos = writer_.getCurrentPosition();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ Row.serializer().serialize(row, bufOut);
+ try
+ {
+ writer_.append(row.key(), bufOut);
+ }
+ catch ( IOException e )
+ {
+ writer_.seek(currentPos);
+ throw e;
+ }
+ }
+
+ /*
+ * This method is used to update the SystemTable with the
+ * new token.
+ */
+ public void updateToken(BigInteger token) throws IOException
+ {
+ if ( systemRow_ != null )
+ {
+ Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilies();
+ /* Retrieve the "LocationInfo" column family */
+ ColumnFamily columnFamily = columnFamilies.get(SystemTable.cfName_);
+ long oldTokenColumnTimestamp = columnFamily.getColumn(SystemTable.token_).timestamp();
+ /* create the "Token" whose value is the new token. */
+ IColumn tokenColumn = new Column(SystemTable.token_, token.toByteArray(), oldTokenColumnTimestamp + 1);
+ /* replace the old "Token" column with this new one. */
+ logger_.debug("Replacing old token " + new BigInteger( columnFamily.getColumn(SystemTable.token_).value() ).toString() + " with token " + token.toString());
+ columnFamily.addColumn(SystemTable.token_, tokenColumn);
+ reset(systemRow_);
+ }
+ }
+
+ public void reset(Row row) throws IOException
+ {
+ writer_.seek(startPosition_);
+ apply(row);
+ }
+
+ void delete(Row row) throws IOException
+ {
+ throw new UnsupportedOperationException("This operation is not supported for System tables");
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ LogUtil.init();
+ StorageService.instance().start();
+ SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
+ System.out.println("Done");
+
+ /*
+ BigInteger hash = StorageService.hash("304700067:0");
+ List<Range> ranges = new ArrayList<Range>();
+ ranges.add( new Range(new BigInteger("1218069462158869448693347920504606362273788442553"), new BigInteger("1092770595533781724218060956188429069")) );
+ if ( Range.isKeyInRanges(ranges, "304700067:0") )
+ {
+ System.out.println("Done");
+ }
+ */
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/Table.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/Table.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/Table.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,967 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.analytics.DBAnalyticsSource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.IStreamComplete;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.service.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+*/
+
+public class Table
+{
+ /*
+ * This class represents the metadata of this Table. The metadata
+ * is basically the column family name and the ID associated with
+ * this column family. We use this ID in the Commit Log header to
+ * determine when a log file that has been rolled can be deleted.
+ */
+ public static class TableMetadata
+ {
+ /* Name of the column family */
+ public final static String cfName_ = "TableMetadata";
+ /*
+ * Name of one of the columns. The other columns are the individual
+ * column families in the system.
+ */
+ public static final String cardinality_ = "PrimaryCardinality";
+ private static ICompactSerializer<TableMetadata> serializer_;
+ static
+ {
+ serializer_ = new TableMetadataSerializer();
+ }
+
+ private static TableMetadata tableMetadata_;
+ /* Use the following writer/reader to write/read to Metadata table */
+ private static IFileWriter writer_;
+ private static IFileReader reader_;
+
+ public static Table.TableMetadata instance() throws IOException
+ {
+ if ( tableMetadata_ == null )
+ {
+ String file = getFileName();
+ writer_ = SequenceFile.writer(file);
+ reader_ = SequenceFile.reader(file);
+ Table.TableMetadata.load();
+ if ( tableMetadata_ == null )
+ tableMetadata_ = new Table.TableMetadata();
+ }
+ return tableMetadata_;
+ }
+
+ static ICompactSerializer<TableMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ private static void load() throws IOException
+ {
+ String file = Table.TableMetadata.getFileName();
+ File f = new File(file);
+ if ( f.exists() )
+ {
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+
+ if ( reader_ == null )
+ {
+ reader_ = SequenceFile.reader(file);
+ }
+
+ while ( !reader_.isEOF() )
+ {
+ /* Read the metadata info. */
+ reader_.next(bufOut);
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+
+ /* The key is the table name */
+ String key = bufIn.readUTF();
+ /* read the size of the data we ignore this value */
+ bufIn.readInt();
+ tableMetadata_ = Table.TableMetadata.serializer().deserialize(bufIn);
+ break;
+ }
+ }
+ }
+
+ /* The mapping between column family and the column type. */
+ private Map<String, String> cfTypeMap_ = new HashMap<String, String>();
+ private Map<String, Integer> cfIdMap_ = new HashMap<String, Integer>();
+ private Map<Integer, String> idCfMap_ = new HashMap<Integer, String>();
+
+ private static String getFileName()
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ return DatabaseDescriptor.getMetadataDirectory() + System.getProperty("file.separator") + table + "-Metadata.db";
+ }
+
+ public void add(String cf, int id)
+ {
+ add(cf, id, "Standard");
+ }
+
+ public void add(String cf, int id, String type)
+ {
+ cfIdMap_.put(cf, id);
+ idCfMap_.put(id, cf);
+ cfTypeMap_.put(cf, type);
+ }
+
+ boolean isEmpty()
+ {
+ return cfIdMap_.isEmpty();
+ }
+
+ int getColumnFamilyId(String columnFamily)
+ {
+ return cfIdMap_.get(columnFamily);
+ }
+
+ String getColumnFamilyName(int id)
+ {
+ return idCfMap_.get(id);
+ }
+
+ String getColumnFamilyType(String cfName)
+ {
+ return cfTypeMap_.get(cfName);
+ }
+
+ void setColumnFamilyType(String cfName, String type)
+ {
+ cfTypeMap_.put(cfName, type);
+ }
+
+ Set<String> getColumnFamilies()
+ {
+ return cfIdMap_.keySet();
+ }
+
+ int size()
+ {
+ return cfIdMap_.size();
+ }
+
+ boolean isValidColumnFamily(String cfName)
+ {
+ return cfIdMap_.containsKey(cfName);
+ }
+
+ void apply() throws IOException
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ Table.TableMetadata.serializer_.serialize(this, bufOut);
+ try
+ {
+ writer_.append(table, bufOut);
+ }
+ catch ( IOException ex )
+ {
+ writer_.seek(0L);
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ }
+
+ public void reset() throws IOException
+ {
+ writer_.seek(0L);
+ apply();
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ Set<String> cfNames = cfIdMap_.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ sb.append(cfName);
+ sb.append("---->");
+ sb.append(cfIdMap_.get(cfName));
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ return sb.toString();
+ }
+ }
+
+ static class TableMetadataSerializer implements ICompactSerializer<TableMetadata>
+ {
+ public void serialize(TableMetadata tmetadata, DataOutputStream dos) throws IOException
+ {
+ int size = tmetadata.cfIdMap_.size();
+ dos.writeInt(size);
+ Set<String> cfNames = tmetadata.cfIdMap_.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ dos.writeUTF(cfName);
+ dos.writeInt( tmetadata.cfIdMap_.get(cfName).intValue() );
+ dos.writeUTF(tmetadata.getColumnFamilyType(cfName));
+ }
+ }
+
+ public TableMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ TableMetadata tmetadata = new TableMetadata();
+ int size = dis.readInt();
+ for( int i = 0; i < size; ++i )
+ {
+ String cfName = dis.readUTF();
+ int id = dis.readInt();
+ String type = dis.readUTF();
+ tmetadata.add(cfName, id, type);
+ }
+ return tmetadata;
+ }
+ }
+
+ /**
+ * This is the callback handler that is invoked when we have
+ * completely been bootstrapped for a single file by a remote host.
+ */
+ public static class BootstrapCompletionHandler implements IStreamComplete
+ {
+ public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+ {
+ /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+ if ( streamContext.getTargetFile().indexOf("-Data.db") != -1 )
+ {
+ File file = new File( streamContext.getTargetFile() );
+ String fileName = file.getName();
+ /*
+ * If the file is a Data File we need to load the indicies associated
+ * with this file. We also need to cache the file name in the SSTables
+ * list of the associated Column Family. Also merge the CBF into the
+ * sampler.
+ */
+ SSTable ssTable = new SSTable(streamContext.getTargetFile() );
+ ssTable.close();
+ logger_.debug("Merging the counting bloom filter in the sampler ...");
+ String[] peices = FBUtilities.strip(fileName, "-");
+ Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(streamContext.getTargetFile());
+ }
+
+ EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
+ logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+ /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+ StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+ Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+ }
+
+ public static class BootStrapInitiateVerbHandler implements IVerbHandler
+ {
+ /*
+ * Here we handle the BootstrapInitiateMessage. Here we get the
+ * array of StreamContexts. We get file names for the column
+ * families associated with the files and replace them with the
+ * file names as obtained from the column family store on the
+ * receiving end.
+ */
+ public void doVerb(Message message)
+ {
+ byte[] body = (byte[])message.getMessageBody()[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+
+ try
+ {
+ BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+
+ Map<String, String> fileNames = getNewNames(streamContexts);
+ /*
+ * For each of stream context's in the incoming message
+ * generate the new file names and store the new file names
+ * in the StreamContextManager.
+ */
+ for (StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+ File sourceFile = new File( streamContext.getTargetFile() );
+ String[] peices = FBUtilities.strip(sourceFile.getName(), "-");
+ String newFileName = fileNames.get( peices[1] + "-" + peices[2] );
+
+ String file = new String(DatabaseDescriptor.getDataFileLocation() + System.getProperty("file.separator") + newFileName + "-Data.db");
+ logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+ streamContext.setTargetFile(file);
+ addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
+ }
+
+ StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
+ /* Send a bootstrap initiation done message to execute on default stage. */
+ logger_.debug("Sending a bootstrap initiate done message ...");
+ Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new Object[]{new byte[0]} );
+ MessagingService.getMessagingInstance().sendOneWay(doneMessage, message.getFrom());
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ private Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts)
+ {
+ /*
+ * Mapping for each file with unique CF-i ---> new file name. For eg.
+ * for a file with name <Table>-<CF>-<i>-Data.db there is a corresponding
+ * <Table>-<CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+ * generated file name.
+ */
+ Map<String, String> fileNames = new HashMap<String, String>();
+ /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index file combination */
+ Set<String> distinctEntries = new HashSet<String>();
+ for ( StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
+ distinctEntries.add(peices[1] + "-" + peices[2]);
+ }
+
+ /* Generate unique file names per entry */
+ Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+ Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
+
+ for ( String distinctEntry : distinctEntries )
+ {
+ String[] peices = FBUtilities.strip(distinctEntry, "-");
+ ColumnFamilyStore cfStore = columnFamilyStores.get(peices[0]);
+ logger_.debug("Generating file name for " + distinctEntry + " ...");
+ fileNames.put(distinctEntry, cfStore.getNextFileName());
+ }
+
+ return fileNames;
+ }
+
+ private boolean isStreamContextForThisColumnFamily(StreamContextManager.StreamContext streamContext, String cf)
+ {
+ String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
+ return peices[1].equals(cf);
+ }
+
+ private String getColumnFamilyFromFile(String file)
+ {
+ String[] peices = FBUtilities.strip(file, "-");
+ return peices[1];
+ }
+
+ private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+ {
+ logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
+ StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+ }
+ }
+
+ private static Logger logger_ = Logger.getLogger(Table.class);
+ public static final String newLine_ = System.getProperty("line.separator");
+ public static final String recycleBin_ = "RecycleColumnFamily";
+ public static final String hints_ = "HintsColumnFamily";
+
+ /* Used to lock the factory for creation of Table instance */
+ private static Lock createLock_ = new ReentrantLock();
+ private static Map<String, Table> instances_ = new HashMap<String, Table>();
+ /* Table name. */
+ private String table_;
+ /* Handle to the Table Metadata */
+ private Table.TableMetadata tableMetadata_;
+ /* ColumnFamilyStore per column family */
+ private Map<String, ColumnFamilyStore> columnFamilyStores_ = new HashMap<String, ColumnFamilyStore>();
+ /* The AnalyticsSource instance which keeps track of statistics reported to Ganglia. */
+ private DBAnalyticsSource dbAnalyticsSource_;
+
+ public static Table open(String table)
+ {
+ Table tableInstance = instances_.get(table);
+ /*
+ * Read the config and figure the column families for this table.
+ * Set the isConfigured flag so that we do not read config all the
+ * time.
+ */
+ if ( tableInstance == null )
+ {
+ Table.createLock_.lock();
+ try
+ {
+ if ( tableInstance == null )
+ {
+ tableInstance = new Table(table);
+ instances_.put(table, tableInstance);
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return tableInstance;
+ }
+
+ public Set<String> getColumnFamilies()
+ {
+ return tableMetadata_.getColumnFamilies();
+ }
+
+ Map<String, ColumnFamilyStore> getColumnFamilyStores()
+ {
+ return columnFamilyStores_;
+ }
+
+ ColumnFamilyStore getColumnFamilyStore(String cfName)
+ {
+ return columnFamilyStores_.get(cfName);
+ }
+
+ String getColumnFamilyType(String cfName)
+ {
+ String cfType = null;
+ if ( tableMetadata_ != null )
+ cfType = tableMetadata_.getColumnFamilyType(cfName);
+ return cfType;
+ }
+
+ public void setColumnFamilyType(String cfName, String type)
+ {
+ tableMetadata_.setColumnFamilyType(cfName, type);
+ }
+
+ /*
+ * This method is called to obtain statistics about
+ * the table. It will return statistics about all
+ * the column families that make up this table.
+ */
+ public String tableStats(String newLineSeparator, java.text.DecimalFormat df)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(table_ + " statistics :");
+ sb.append(newLineSeparator);
+ int oldLength = sb.toString().length();
+
+ Set<String> cfNames = columnFamilyStores_.keySet();
+ for ( String cfName : cfNames )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
+ sb.append(cfStore.cfStats(newLineSeparator, df));
+ }
+ int newLength = sb.toString().length();
+
+ /* Don't show anything if there is nothing to show. */
+ if ( newLength == oldLength )
+ return "";
+
+ return sb.toString();
+ }
+
+ void onStart() throws IOException
+ {
+ /* Cache the callouts if any */
+ CalloutManager.instance().onStart();
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ cfStore.onStart();
+ }
+ }
+
+ /**
+ * Do a cleanup of keys that do not belong locally.
+ */
+ public void doGC()
+ {
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ cfStore.forceCleanup();
+ }
+ }
+
+
+ /*
+ * This method is used to ensure that all keys
+ * prior to the specified key, as dtermined by
+ * the SSTable index bucket it falls in, are in
+ * buffer cache.
+ */
+ public void touch(String key, boolean fData) throws IOException
+ {
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ if ( DatabaseDescriptor.isApplicationColumnFamily(columnFamily) )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ cfStore.touch(key, fData);
+ }
+ }
+ }
+
+ /*
+ * Take a snapshot of the entire set of column families.
+ */
+ public void snapshot( String clientSuppliedName ) throws IOException
+ {
+ String snapshotDirectory = DatabaseDescriptor.getSnapshotDirectory();
+ File snapshotDir = new File(snapshotDirectory);
+ if( !snapshotDir.exists() )
+ snapshotDir.mkdir();
+
+ String currentSnapshotDir = null;
+ if( clientSuppliedName != null && !clientSuppliedName.equals("") )
+ currentSnapshotDir = snapshotDirectory + System.getProperty("file.separator") + System.currentTimeMillis() + "-" + clientSuppliedName;
+ else
+ currentSnapshotDir = snapshotDirectory + System.getProperty("file.separator") + System.currentTimeMillis();
+
+ /* First take a snapshot of the commit logs */
+ CommitLog.open(table_).snapshot( currentSnapshotDir );
+ /* force roll over the commit log */
+ CommitLog.open(table_).setForcedRollOver();
+ /* Now take a snapshot of all columnfamily stores */
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ cfStore.snapshot( currentSnapshotDir );
+ }
+ }
+
+ /*
+ * Clear the existing snapshots in the system
+ */
+ public void clearSnapshot()
+ {
+ String snapshotDir = DatabaseDescriptor.getSnapshotDirectory();
+ File snapshot = new File(snapshotDir);
+ FileUtils.deleteDir(snapshot);
+ }
+
+ /*
+ * This method is invoked only during a bootstrap process. We basically
+ * do a complete compaction since we can figure out based on the ranges
+ * whether the files need to be split.
+ */
+ public boolean forceCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+ {
+ boolean result = true;
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ if ( !isApplicationColumnFamily(columnFamily) )
+ continue;
+
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ {
+ /* Counting Bloom Filter for the Column Family */
+ cfStore.forceCompaction(ranges, target, 0, fileList);
+ }
+ }
+ return result;
+ }
+
+ /*
+ * This method is an ADMIN operation to force compaction
+ * of all SSTables on disk.
+ */
+ public void forceCompaction() throws IOException
+ {
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ MinorCompactionManager.instance().submitMajor(cfStore, null, 0);
+ }
+ }
+
+ /*
+ * Get the list of all SSTables on disk.
+ */
+ public List<String> getAllSSTablesOnDisk()
+ {
+ List<String> list = new ArrayList<String>();
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ list.addAll( cfStore.getAllSSTablesOnDisk() );
+ }
+ return list;
+ }
+
+ private Table(String table)
+ {
+ table_ = table;
+ dbAnalyticsSource_ = new DBAnalyticsSource();
+ try
+ {
+ tableMetadata_ = Table.TableMetadata.instance();
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ columnFamilyStores_.put( columnFamily, new ColumnFamilyStore(table, columnFamily) );
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ String getTableName()
+ {
+ return table_;
+ }
+
+ boolean isApplicationColumnFamily(String columnFamily)
+ {
+ return DatabaseDescriptor.isApplicationColumnFamily(columnFamily);
+ }
+
+ int getNumberOfColumnFamilies()
+ {
+ return tableMetadata_.size();
+ }
+
+ int getColumnFamilyId(String columnFamily)
+ {
+ return tableMetadata_.getColumnFamilyId(columnFamily);
+ }
+
+ String getColumnFamilyName(int id)
+ {
+ return tableMetadata_.getColumnFamilyName(id);
+ }
+
+ boolean isValidColumnFamily(String columnFamily)
+ {
+ return tableMetadata_.isValidColumnFamily(columnFamily);
+ }
+
+ /**
+ * Selects the row associated with the given key.
+ */
+ public Row get(String key) throws IOException
+ {
+ Row row = new Row(key);
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ long start = System.currentTimeMillis();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
+ if ( cfStore != null )
+ {
+ ColumnFamily cf = cfStore.getColumnFamily(key, columnFamily, new IdentityFilter());
+ if ( cf != null )
+ row.addColumnFamily(cf);
+ }
+ }
+
+ long timeTaken = System.currentTimeMillis() - start;
+ dbAnalyticsSource_.updateReadStatistics(timeTaken);
+ return row;
+ }
+
+ public Row getRowFromMemory(String key)
+ {
+ Row row = new Row(key);
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ long start = System.currentTimeMillis();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
+ if ( cfStore != null )
+ {
+ ColumnFamily cf = cfStore.getColumnFamilyFromMemory(key, columnFamily, new IdentityFilter());
+ if ( cf != null )
+ row.addColumnFamily(cf);
+ }
+ }
+ long timeTaken = System.currentTimeMillis() - start;
+ dbAnalyticsSource_.updateReadStatistics(timeTaken);
+ return row;
+ }
+
+
+ /**
+ * Selects the specified column family for the specified key.
+ */
+ public ColumnFamily get(String key, String cf) throws ColumnFamilyNotDefinedException, IOException
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ long start = System.currentTimeMillis();
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+ if ( cfStore != null )
+ {
+ ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new IdentityFilter());
+ long timeTaken = System.currentTimeMillis() - start;
+ dbAnalyticsSource_.updateReadStatistics(timeTaken);
+ return columnFamily;
+ }
+ else
+ {
+ throw new ColumnFamilyNotDefinedException("Column family " + cf + " has not been defined");
+ }
+ }
+
+ /**
+ * Selects only the specified column family for the specified key.
+ */
+ public Row getRow(String key, String cf) throws ColumnFamilyNotDefinedException, IOException
+ {
+ Row row = new Row(key);
+ ColumnFamily columnFamily = get(key, cf);
+ if ( columnFamily != null )
+ row.addColumnFamily(columnFamily);
+ return row;
+ }
+
+ /**
+ * Selects only the specified column family for the specified key.
+ */
+ public Row getRow(String key, String cf, int start, int count) throws ColumnFamilyNotDefinedException, IOException
+ {
+ Row row = new Row(key);
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+ long start1 = System.currentTimeMillis();
+ if ( cfStore != null )
+ {
+ ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new CountFilter(count));
+ if ( columnFamily != null )
+ row.addColumnFamily(columnFamily);
+ long timeTaken = System.currentTimeMillis() - start1;
+ dbAnalyticsSource_.updateReadStatistics(timeTaken);
+ return row;
+ }
+ else
+ throw new ColumnFamilyNotDefinedException("Column family " + cf + " has not been defined");
+ }
+
+ public Row getRow(String key, String cf, long sinceTimeStamp) throws ColumnFamilyNotDefinedException, IOException
+ {
+ Row row = new Row(key);
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+ long start1 = System.currentTimeMillis();
+ if ( cfStore != null )
+ {
+ ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new TimeFilter(sinceTimeStamp));
+ if ( columnFamily != null )
+ row.addColumnFamily(columnFamily);
+ long timeTaken = System.currentTimeMillis() - start1;
+ dbAnalyticsSource_.updateReadStatistics(timeTaken);
+ return row;
+ }
+ else
+ throw new ColumnFamilyNotDefinedException("Column family " + cf + " has not been defined");
+ }
+
+ /**
+ * This method returns the specified columns for the specified
+ * column family.
+ *
+ * param @ key - key for which data is requested.
+ * param @ cf - column family we are interested in.
+ * param @ columns - columns that are part of the above column family.
+ */
+ public Row getRow(String key, String cf, List<String> columns) throws IOException
+ {
+ Row row = new Row(key);
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+
+ if ( cfStore != null )
+ {
+ ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new NamesFilter(new ArrayList<String>(columns)));
+ if ( columnFamily != null )
+ row.addColumnFamily(columnFamily);
+ }
+ return row;
+ }
+
+ /**
+ * This method adds the row to the Commit Log associated with this table.
+ * Once this happens the data associated with the individual column families
+ * is also written to the column family store's memtable.
+ */
+ public void apply(Row row) throws IOException
+ {
+ String key = row.key();
+ /* Add row to the commit log. */
+ long start = System.currentTimeMillis();
+
+ CommitLog.CommitLogContext cLogCtx = CommitLog.open(table_).add(row);
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+ cfStore.apply( key, columnFamily, cLogCtx);
+ }
+ row.clear();
+ long timeTaken = System.currentTimeMillis() - start;
+ dbAnalyticsSource_.updateWriteStatistics(timeTaken);
+ }
+
+ void applyNow(Row row) throws IOException
+ {
+ String key = row.key();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+ cfStore.applyNow( key, columnFamily );
+ }
+ }
+
+ public void flush(boolean fRecovery) throws IOException
+ {
+ Set<String> cfNames = columnFamilyStores_.keySet();
+ for ( String cfName : cfNames )
+ {
+ columnFamilyStores_.get(cfName).forceFlush(fRecovery);
+ }
+ }
+
+ void delete(Row row) throws IOException
+ {
+ String key = row.key();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+
+ /* Add row to commit log */
+ CommitLog.open(table_).add(row);
+ Set<String> cNames = columnFamilies.keySet();
+
+ for ( String cName : cNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+ cfStore.delete( key, columnFamily );
+ }
+ }
+
+ void load(Row row) throws IOException
+ {
+ String key = row.key();
+ /* Add row to the commit log. */
+ long start = System.currentTimeMillis();
+
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
+ {
+ if( cName.equals(Table.recycleBin_))
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
+ if(column.timestamp() == 1)
+ {
+ cfStore.forceFlushBinary();
+ }
+ else if(column.timestamp() == 2)
+ {
+ cfStore.forceCompaction(null, null, BasicUtilities.byteArrayToLong(column.value()), null);
+ }
+ else if(column.timestamp() == 3)
+ {
+ cfStore.forceFlush(false);
+ }
+ else if(column.timestamp() == 4)
+ {
+ cfStore.forceCleanup();
+ }
+ else if(column.timestamp() == 5)
+ {
+ cfStore.snapshot( new String(column.value()) );
+ }
+ else
+ {
+ cfStore.applyBinary(key, column.value());
+ }
+ }
+ }
+ }
+ row.clear();
+ long timeTaken = System.currentTimeMillis() - start;
+ dbAnalyticsSource_.updateWriteStatistics(timeTaken);
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ StorageService service = StorageService.instance();
+ service.start();
+ Table table = Table.open("Mailbox");
+ Row row = table.get("35300190:1");
+ System.out.println( row.key() );
+ }
+}