You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:04:22 UTC
svn commit: r749202 [1/6] - in /incubator/cassandra/src: ./ org/ org/apache/
org/apache/cassandra/ org/apache/cassandra/db/
Author: pmalik
Date: Mon Mar 2 06:04:20 2009
New Revision: 749202
URL: http://svn.apache.org/viewvc?rev=749202&view=rev
Log:
Adding cassandra sources
Added:
incubator/cassandra/src/
incubator/cassandra/src/org/
incubator/cassandra/src/org/apache/
incubator/cassandra/src/org/apache/cassandra/
incubator/cassandra/src/org/apache/cassandra/db/
incubator/cassandra/src/org/apache/cassandra/db/AbstractColumnFactory.java
incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtableManager.java
incubator/cassandra/src/org/apache/cassandra/db/BinaryVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployMessage.java
incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/CalloutManager.java
incubator/cassandra/src/org/apache/cassandra/db/Column.java
incubator/cassandra/src/org/apache/cassandra/db/ColumnComparatorFactory.java
incubator/cassandra/src/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java
incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/src/org/apache/cassandra/db/ColumnIndexer.java
incubator/cassandra/src/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/src/org/apache/cassandra/db/CommitLogEntry.java
incubator/cassandra/src/org/apache/cassandra/db/CommitLogHeader.java
incubator/cassandra/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
incubator/cassandra/src/org/apache/cassandra/db/CountFilter.java
incubator/cassandra/src/org/apache/cassandra/db/DBConstants.java
incubator/cassandra/src/org/apache/cassandra/db/DBManager.java
incubator/cassandra/src/org/apache/cassandra/db/DataFileVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/EfficientBidiMap.java
incubator/cassandra/src/org/apache/cassandra/db/FileNameComparator.java
incubator/cassandra/src/org/apache/cassandra/db/FileStruct.java
incubator/cassandra/src/org/apache/cassandra/db/FileStructComparator.java
incubator/cassandra/src/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/src/org/apache/cassandra/db/IColumn.java
incubator/cassandra/src/org/apache/cassandra/db/ICompactSerializer2.java
incubator/cassandra/src/org/apache/cassandra/db/IFilter.java
incubator/cassandra/src/org/apache/cassandra/db/IScanner.java
incubator/cassandra/src/org/apache/cassandra/db/IdentityFilter.java
incubator/cassandra/src/org/apache/cassandra/db/LoadVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/src/org/apache/cassandra/db/MemtableMBean.java
incubator/cassandra/src/org/apache/cassandra/db/MemtableManager.java
incubator/cassandra/src/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/src/org/apache/cassandra/db/NamesFilter.java
incubator/cassandra/src/org/apache/cassandra/db/PrimaryKey.java
incubator/cassandra/src/org/apache/cassandra/db/ReadMessage.java
incubator/cassandra/src/org/apache/cassandra/db/ReadRepairVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/ReadResponseMessage.java
incubator/cassandra/src/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/RecoveryManager.java
incubator/cassandra/src/org/apache/cassandra/db/Row.java
incubator/cassandra/src/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/src/org/apache/cassandra/db/RowMutationMessage.java
incubator/cassandra/src/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/Scanner.java
incubator/cassandra/src/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/src/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/src/org/apache/cassandra/db/Table.java
incubator/cassandra/src/org/apache/cassandra/db/TimeFilter.java
incubator/cassandra/src/org/apache/cassandra/db/TouchMessage.java
incubator/cassandra/src/org/apache/cassandra/db/TouchVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/db/TypeInfo.java
incubator/cassandra/src/org/apache/cassandra/db/WriteResponseMessage.java
Added: incubator/cassandra/src/org/apache/cassandra/db/AbstractColumnFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/AbstractColumnFactory.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/AbstractColumnFactory.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/AbstractColumnFactory.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+abstract class AbstractColumnFactory
+{
+ private static Map<String, AbstractColumnFactory> columnFactory_ = new HashMap<String, AbstractColumnFactory>();
+
+ static
+ {
+ columnFactory_.put(ColumnFamily.getColumnType("Standard"),new ColumnFactory());
+ columnFactory_.put(ColumnFamily.getColumnType("Super"),new SuperColumnFactory());
+ }
+
+ static AbstractColumnFactory getColumnFactory(String columnType)
+ {
+ /* Create based on the type required. */
+ if ( columnType == null || columnType.equals("Standard") )
+ return columnFactory_.get("Standard");
+ else
+ return columnFactory_.get("Super");
+ }
+
+ public abstract IColumn createColumn(String name);
+ public abstract IColumn createColumn(String name, byte[] value);
+ public abstract IColumn createColumn(String name, byte[] value, long timestamp);
+ public abstract ICompactSerializer2<IColumn> createColumnSerializer();
+}
+
+class ColumnFactory extends AbstractColumnFactory
+{
+ public IColumn createColumn(String name)
+ {
+ return new Column(name);
+ }
+
+ public IColumn createColumn(String name, byte[] value)
+ {
+ return new Column(name, value);
+ }
+
+ public IColumn createColumn(String name, byte[] value, long timestamp)
+ {
+ return new Column(name, value, timestamp);
+ }
+
+ public ICompactSerializer2<IColumn> createColumnSerializer()
+ {
+ return Column.serializer();
+ }
+}
+
+class SuperColumnFactory extends AbstractColumnFactory
+{
+ static String[] getSuperColumnAndColumn(String cName)
+ {
+ StringTokenizer st = new StringTokenizer(cName, ":");
+ String[] values = new String[st.countTokens()];
+ int i = 0;
+ while ( st.hasMoreElements() )
+ {
+ values[i++] = (String)st.nextElement();
+ }
+ return values;
+ }
+
+ public IColumn createColumn(String name)
+ {
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length == 0 || values.length > 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ if(values.length == 2)
+ {
+ IColumn subColumn = new Column(values[1]);
+ superColumn.addColumn(values[1], subColumn);
+ }
+ return superColumn;
+ }
+
+ public IColumn createColumn(String name, byte[] value)
+ {
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length != 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ IColumn subColumn = new Column(values[1], value);
+ superColumn.addColumn(values[1], subColumn);
+ return superColumn;
+ }
+
+ public IColumn createColumn(String name, byte[] value, long timestamp)
+ {
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length != 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ IColumn subColumn = new Column(values[1], value, timestamp);
+ superColumn.addColumn(values[1], subColumn);
+ return superColumn;
+ }
+
+ public ICompactSerializer2<IColumn> createColumnSerializer()
+ {
+ return SuperColumn.serializer();
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtable.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtable.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtable implements MemtableMBean
+{
+ private static Logger logger_ = Logger.getLogger( Memtable.class );
+ private int threshold_ = 512*1024*1024;
+ private AtomicInteger currentSize_ = new AtomicInteger(0);
+
+ /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+ private String table_;
+ private String cfName_;
+ private boolean isFrozen_ = false;
+ private Map<String, byte[]> columnFamilies_ = new NonBlockingHashMap<String, byte[]>();
+ /* Lock and Condition for notifying new clients about Memtable switches */
+ Lock lock_ = new ReentrantLock();
+ Condition condition_;
+
+ BinaryMemtable(String table, String cfName) throws IOException
+ {
+ condition_ = lock_.newCondition();
+ table_ = table;
+ cfName_ = cfName;
+ }
+
+ public int getMemtableThreshold()
+ {
+ return currentSize_.get();
+ }
+
+ void resolveSize(int oldSize, int newSize)
+ {
+ currentSize_.addAndGet(newSize - oldSize);
+ }
+
+
+ boolean isThresholdViolated()
+ {
+ if (currentSize_.get() >= threshold_ || columnFamilies_.size() > 50000)
+ {
+ logger_.debug("CURRENT SIZE:" + currentSize_.get());
+ return true;
+ }
+ return false;
+ }
+
+ String getColumnFamily()
+ {
+ return cfName_;
+ }
+
+ /*
+ * This version is used by the external clients to put data into
+ * the memtable. This version will respect the threshold and flush
+ * the memtable to disk when the size exceeds the threshold.
+ */
+ void put(String key, byte[] buffer) throws IOException
+ {
+ if (isThresholdViolated() )
+ {
+ lock_.lock();
+ try
+ {
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ if (!isFrozen_)
+ {
+ isFrozen_ = true;
+ BinaryMemtableManager.instance().submit(cfStore.getColumnFamilyName(), this);
+ cfStore.switchBinaryMemtable(key, buffer);
+ }
+ else
+ {
+ cfStore.applyBinary(key, buffer);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ else
+ {
+ resolve(key, buffer);
+ }
+ }
+
+ private void resolve(String key, byte[] buffer)
+ {
+ columnFamilies_.put(key, buffer);
+ currentSize_.addAndGet(buffer.length + key.length());
+ }
+
+
+ /*
+ *
+ */
+ void flush() throws IOException
+ {
+ if ( columnFamilies_.size() == 0 )
+ return;
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ String directory = DatabaseDescriptor.getDataFileLocation();
+ String filename = cfStore.getNextFileName();
+
+ /*
+ * Use the SSTable to write the contents of the TreeMap
+ * to disk.
+ */
+ SSTable ssTable = new SSTable(directory, filename);
+ List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
+ Collections.sort(keys);
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(keys.size(), 8);
+ for ( String key : keys )
+ {
+ byte[] bytes = columnFamilies_.get(key);
+ if ( bytes.length > 0 )
+ {
+ /* Now write the key and value to disk */
+ ssTable.append(key, bytes);
+ bf.fill(key);
+ }
+ }
+ ssTable.close(bf);
+ cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+ columnFamilies_.clear();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtableManager.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtableManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/BinaryMemtableManager.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryMemtableManager
+{
+ private static BinaryMemtableManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(BinaryMemtableManager.class);
+
+ static BinaryMemtableManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new BinaryMemtableManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ class BinaryMemtableFlusher implements Runnable
+ {
+ private BinaryMemtable memtable_;
+
+ BinaryMemtableFlusher(BinaryMemtable memtable)
+ {
+ memtable_ = memtable;
+ }
+
+ public void run()
+ {
+ try
+ {
+ memtable_.flush();
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ }
+ }
+
+ private ExecutorService flusher_ = new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("BINARY-MEMTABLE-FLUSHER-POOL")
+ );
+
+ /* Submit memtables to be flushed to disk */
+ void submit(String cfName, BinaryMemtable memtbl)
+ {
+ flusher_.submit( new BinaryMemtableFlusher(memtbl) );
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/BinaryVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/BinaryVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/BinaryVerbHandler.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.RowMutationVerbHandler.RowMutationContext;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BinaryVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(BinaryVerbHandler.class);
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ /* Obtain a Row Mutation Context from TLS */
+ RowMutationContext rowMutationCtx = tls_.get();
+ if ( rowMutationCtx == null )
+ {
+ rowMutationCtx = new RowMutationContext();
+ tls_.set(rowMutationCtx);
+ }
+ rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
+ try
+ {
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+ RowMutation rm = rmMsg.getRowMutation();
+ rowMutationCtx.row_.key(rm.key());
+ rm.load(rowMutationCtx.row_);
+
+ }
+ catch ( Exception e )
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployMessage.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+public class CalloutDeployMessage
+{
+ private static ICompactSerializer<CalloutDeployMessage> serializer_;
+
+ static
+ {
+ serializer_ = new CalloutDeployMessageSerializer();
+ }
+
+ public static ICompactSerializer<CalloutDeployMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message getCalloutDeployMessage(CalloutDeployMessage cdMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ serializer_.serialize(cdMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ /* Name of the callout */
+ private String callout_;
+ /* The actual procedure */
+ private String script_;
+
+ public CalloutDeployMessage(String callout, String script)
+ {
+ callout_ = callout;
+ script_ = script;
+ }
+
+ String getCallout()
+ {
+ return callout_;
+ }
+
+ String getScript()
+ {
+ return script_;
+ }
+}
+
+class CalloutDeployMessageSerializer implements ICompactSerializer<CalloutDeployMessage>
+{
+ public void serialize(CalloutDeployMessage cdMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(cdMessage.getCallout());
+ dos.writeUTF(cdMessage.getScript());
+ }
+
+ public CalloutDeployMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String callout = dis.readUTF();
+ String script = dis.readUTF();
+ return new CalloutDeployMessage(callout, script);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/CalloutDeployVerbHandler.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class CalloutDeployVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(CalloutDeployVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ Object[] body = message.getMessageBody();
+ byte[] bytes = (byte[])body[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ try
+ {
+ CalloutDeployMessage cdMessage = CalloutDeployMessage.serializer().deserialize(bufIn);
+ /* save the callout to callout cache and to disk. */
+ CalloutManager.instance().addCallout( cdMessage.getCallout(), cdMessage.getScript() );
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/CalloutManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/CalloutManager.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/CalloutManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/CalloutManager.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.script.Bindings;
+import javax.script.Invocable;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.procedures.GroovyScriptRunner;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class CalloutManager
+{
+ private final static Logger logger_ = Logger.getLogger(CalloutManager.class);
+ private static final String extn_ = ".groovy";
+ /* Used to lock the factory for creation of CalloutManager instance */
+ private static Lock createLock_ = new ReentrantLock();
+ /* An instance of the CalloutManager */
+ private static CalloutManager instance_;
+
+ public static CalloutManager instance()
+ {
+ if ( instance_ == null )
+ {
+ CalloutManager.createLock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ {
+ instance_ = new CalloutManager();
+ }
+ }
+ finally
+ {
+ CalloutManager.createLock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ /* Map containing the name of callout as key and the callout script as value */
+ private Map<String, CompiledScript> calloutCache_ = new HashMap<String, CompiledScript>();
+ /* The Groovy Script compiler instance */
+ private Compilable compiler_;
+ /* The Groovy script invokable instance */
+ private Invocable invokable_;
+
+ private CalloutManager()
+ {
+ ScriptEngineManager scriptManager = new ScriptEngineManager();
+ ScriptEngine groovyEngine = scriptManager.getEngineByName("groovy");
+ compiler_ = (Compilable)groovyEngine;
+ invokable_ = (Invocable)groovyEngine;
+ }
+
+ /**
+ * Compile the script and cache the compiled script.
+ * @param script to be compiled
+ * @throws ScriptException
+ */
+ private void compileAndCache(String scriptId, String script) throws ScriptException
+ {
+ if ( compiler_ != null )
+ {
+ CompiledScript compiledScript = compiler_.compile(script);
+ calloutCache_.put(scriptId, compiledScript);
+ }
+ }
+
+ /**
+ * Invoked on start up to load all the stored callouts, compile
+ * and cache them.
+ *
+ * @throws IOException
+ */
+ void onStart() throws IOException
+ {
+ String location = DatabaseDescriptor.getCalloutLocation();
+ if ( location == null )
+ return;
+
+ File directory = new File(location);
+
+ if ( !directory.exists() )
+ directory.mkdir();
+
+ File[] files = directory.listFiles();
+
+ for ( File file : files )
+ {
+ String f = file.getName();
+ /* Get the callout name from the file */
+ String callout = f.split(extn_)[0];
+ FileInputStream fis = new FileInputStream(file);
+ byte[] bytes = new byte[fis.available()];
+ fis.read(bytes);
+ fis.close();
+ /* cache the callout after compiling it */
+ try
+ {
+ compileAndCache(callout, new String(bytes));
+ }
+ catch ( ScriptException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+
+ /**
+ * Store the callout in cache and write it out
+ * to disk.
+ * @param callout the name of the callout
+ * @param script actual implementation of the callout
+ */
+ public void addCallout(String callout, String script) throws IOException
+ {
+ /* cache the script */
+ /* cache the callout after compiling it */
+ try
+ {
+ compileAndCache(callout, script);
+ }
+ catch ( ScriptException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ /* save the script to disk */
+ String scriptFile = DatabaseDescriptor.getCalloutLocation() + System.getProperty("file.separator") + callout + extn_;
+ File file = new File(scriptFile);
+ if ( file.exists() )
+ {
+ logger_.debug("Deleting the old script file ...");
+ file.delete();
+ }
+ FileOutputStream fos = new FileOutputStream(scriptFile);
+ fos.write(script.getBytes());
+ fos.close();
+ }
+
+ /**
+ * Remove the registered callout and delete the
+ * script on the disk.
+ * @param callout to be removed
+ */
+ public void removeCallout(String callout)
+ {
+ /* remove the script from cache */
+ calloutCache_.remove(callout);
+ String scriptFile = DatabaseDescriptor.getCalloutLocation() + System.getProperty("file.separator") + callout + ".grv";
+ File file = new File(scriptFile);
+ file.delete();
+ }
+
+ /**
+ * Execute the specified callout.
+ * @param callout to be executed.
+ * @params args arguments to be passed to the callouts.
+ */
+ public Object executeCallout(String callout, Object ... args)
+ {
+ Object result = null;
+ CompiledScript script = calloutCache_.get(callout);
+ if ( script != null )
+ {
+ try
+ {
+ Bindings binding = new SimpleBindings();
+ binding.put("args", args);
+ result = script.eval(binding);
+ }
+ catch(ScriptException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ return result;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/Column.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/Column.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/Column.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class Column implements IColumn, Serializable
+{
+ private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+ private static ICompactSerializer2<IColumn> serializer_;
+ private final static String seperator_ = ":";
+ static
+ {
+ serializer_ = new ColumnSerializer();
+ }
+
+ static ICompactSerializer2<IColumn> serializer()
+ {
+ return serializer_;
+ }
+
+ private String name_;
+ private byte[] value_ = new byte[0];
+ private long timestamp_ = 0;
+
+ private transient AtomicBoolean isMarkedForDelete_;
+
+ /* CTOR for JAXB */
+ Column()
+ {
+ }
+
+ Column(String name)
+ {
+ name_ = name;
+ }
+
+ Column(String name, byte[] value)
+ {
+ this(name, value, 0);
+ }
+
+ Column(String name, byte[] value, long timestamp)
+ {
+ this(name);
+ value_ = value;
+ timestamp_ = timestamp;
+ }
+
+ public String name()
+ {
+ return name_;
+ }
+
+ public byte[] value()
+ {
+ return value_;
+ }
+
+ public byte[] value(String key)
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public Collection<IColumn> getSubColumns()
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public IColumn getSubColumn( String columnName )
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public int getObjectCount()
+ {
+ return 1;
+ }
+
+ public long timestamp()
+ {
+ return timestamp_;
+ }
+
+ public long timestamp(String key)
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public boolean isMarkedForDelete()
+ {
+ return (isMarkedForDelete_ != null) ? isMarkedForDelete_.get() : false;
+ }
+
+ public int size()
+ {
+ /*
+ * Size of a column is =
+ * size of a name (UtfPrefix + length of the string)
+ * + 1 byte to indicate if the column has been deleted
+ * + 8 bytes for timestamp
+ * + 4 bytes which basically indicates the size of the byte array
+ * + entire byte array.
+ */
+
+ /*
+ * We store the string as UTF-8 encoded, so when we calculate the length, it
+ * should be converted to UTF-8.
+ */
+ return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value_.length;
+ }
+
+ /*
+ * This returns the size of the column when serialized.
+ * @see com.facebook.infrastructure.db.IColumn#serializedSize()
+ */
+ public int serializedSize()
+ {
+ return size();
+ }
+
+ public void addColumn(String name, IColumn column)
+ {
+ throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+ }
+
+ public void delete()
+ {
+ if ( isMarkedForDelete_ == null )
+ isMarkedForDelete_ = new AtomicBoolean(true);
+ else
+ isMarkedForDelete_.set(true);
+ value_ = new byte[0];
+ }
+
+ public void repair(IColumn column)
+ {
+ if( timestamp() < column.timestamp() )
+ {
+ value_ = column.value();
+ timestamp_ = column.timestamp();
+ }
+ }
+ public IColumn diff(IColumn column)
+ {
+ IColumn columnDiff = null;
+ if( timestamp() < column.timestamp() )
+ {
+ columnDiff = new Column(column.name(),column.value(),column.timestamp());
+ }
+ return columnDiff;
+ }
+
+ /*
+ * Resolve the column by comparing timestamps
+ * if a newer vaue is being input
+ * take the change else ignore .
+ *
+ */
+ public boolean putColumn(IColumn column)
+ {
+ if ( !(column instanceof Column))
+ throw new UnsupportedOperationException("Only Column objects should be put here");
+ if( !name_.equals(column.name()))
+ throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ if(timestamp_ <= column.timestamp())
+ {
+ return true;
+ }
+ return false;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ sb.append(timestamp());
+ sb.append(":");
+ sb.append(value().length);
+ sb.append(":");
+ sb.append(value());
+ sb.append(":");
+ return sb.toString();
+ }
+
+ public byte[] digest()
+ {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(name_);
+ stringBuilder.append(seperator_);
+ stringBuilder.append(timestamp_);
+ return stringBuilder.toString().getBytes();
+ }
+
+ /**
+ * This method is basically implemented for Writable interface
+ * for M/R.
+ */
+ public void readFields(DataInput in) throws IOException
+ {
+ name_ = in.readUTF();
+ boolean delete = in.readBoolean();
+ long ts = in.readLong();
+ int size = in.readInt();
+ byte[] value = new byte[size];
+ in.readFully(value);
+ if ( delete )
+ delete();
+ }
+
+ /**
+ * This method is basically implemented for Writable interface
+ * for M/R.
+ */
+ public void write(DataOutput out) throws IOException
+ {
+ out.writeUTF(name_);
+ out.writeBoolean(isMarkedForDelete());
+ out.writeLong(timestamp_);
+ out.writeInt(value().length);
+ out.write(value());
+ }
+
+}
+
+class ColumnSerializer implements ICompactSerializer2<IColumn>
+{
+ public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(column.name());
+ dos.writeBoolean(column.isMarkedForDelete());
+ dos.writeLong(column.timestamp());
+ dos.writeInt(column.value().length);
+ dos.write(column.value());
+ }
+
+ private IColumn defreeze(DataInputStream dis, String name) throws IOException
+ {
+ IColumn column = null;
+ boolean delete = dis.readBoolean();
+ long ts = dis.readLong();
+ int size = dis.readInt();
+ byte[] value = new byte[size];
+ dis.readFully(value);
+ column = new Column(name, value, ts);
+ if ( delete )
+ column.delete();
+ return column;
+ }
+
+ public IColumn deserialize(DataInputStream dis) throws IOException
+ {
+ String name = dis.readUTF();
+ return defreeze(dis, name);
+ }
+
+ /**
+ * Here we need to get the column and apply the filter.
+ */
+ public IColumn deserialize(DataInputStream dis, IFilter filter) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return null;
+
+ String name = dis.readUTF();
+ IColumn column = new Column(name);
+ column = filter.filter(column, dis);
+ if ( column != null )
+ {
+ column = defreeze(dis, name);
+ }
+ else
+ {
+ /* Skip a boolean and the timestamp */
+ dis.skip(DBConstants.boolSize_ + DBConstants.tsSize_);
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+ return column;
+ }
+
+ /**
+ * We know the name of the column here so just return it.
+ * Filter is pretty much useless in this call and is ignored.
+ */
+ public IColumn deserialize(DataInputStream dis, String columnName, IFilter filter) throws IOException
+ {
+ if ( dis.available() == 0 )
+ return null;
+ IColumn column = null;
+ String name = dis.readUTF();
+ if ( name.equals(columnName) )
+ {
+ column = defreeze(dis, name);
+ if( filter instanceof IdentityFilter )
+ {
+ /*
+ * If this is being called with identity filter
+ * since a column name is passed in we know
+ * that this is a final call
+ * Hence if the column is found set the filter to done
+ * so that we do not look for the column in further files
+ */
+ IdentityFilter f = (IdentityFilter)filter;
+ f.setDone();
+ }
+ }
+ else
+ {
+ /* Skip a boolean and the timestamp */
+ dis.skip(DBConstants.boolSize_ + DBConstants.tsSize_);
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+ return column;
+ }
+
+ public void skip(DataInputStream dis) throws IOException
+ {
+ /* read the column name */
+ dis.readUTF();
+ /* boolean indicating if the column is deleted */
+ dis.readBoolean();
+ /* timestamp associated with the column */
+ dis.readLong();
+ /* size of the column */
+ int size = dis.readInt();
+ dis.skip(size);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/ColumnComparatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ColumnComparatorFactory.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ColumnComparatorFactory.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ColumnComparatorFactory.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnComparatorFactory
+{
+ public static enum ComparatorType
+ {
+ NAME,
+ TIMESTAMP
+ }
+
+ private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
+ private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
+
+ public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
+ {
+ Comparator<IColumn> columnComparator = timestampComparator_;
+
+ switch(comparatorType)
+ {
+ case NAME:
+ columnComparator = nameComparator_;
+ break;
+
+ case TIMESTAMP:
+
+ default:
+ columnComparator = timestampComparator_;
+ break;
+ }
+
+ return columnComparator;
+ }
+
+ public static Comparator<IColumn> getComparator(int comparatorTypeInt)
+ {
+ ComparatorType comparatorType = ComparatorType.NAME;
+
+ if(comparatorTypeInt == ComparatorType.NAME.ordinal())
+ {
+ comparatorType = ComparatorType.NAME;
+ }
+ else if(comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
+ {
+ comparatorType = ComparatorType.TIMESTAMP;
+ }
+ return getComparator(comparatorType);
+ }
+
+ public static void main(String[] args)
+ {
+ IColumn col1 = new Column("Column-9");
+ IColumn col2 = new Column("Column-10");
+ System.out.println("Result of compare: " + getComparator(ComparatorType.NAME).compare(col1, col2));
+ }
+}
+
+abstract class AbstractColumnComparator implements Comparator<IColumn>, Serializable
+{
+ protected ColumnComparatorFactory.ComparatorType comparatorType_;
+
+ public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
+ {
+ comparatorType_ = comparatorType;
+ }
+
+ ColumnComparatorFactory.ComparatorType getComparatorType()
+ {
+ return comparatorType_;
+ }
+}
+
+class ColumnTimestampComparator extends AbstractColumnComparator
+{
+ ColumnTimestampComparator()
+ {
+ super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+ }
+
+ /* if the time-stamps are the same then sort by names */
+ public int compare(IColumn column1, IColumn column2)
+ {
+ /* inverse sort by time to get hte latest first */
+ long result = column2.timestamp() - column1.timestamp();
+ int finalResult = 0;
+ if(result == 0)
+ {
+ result = column1.name().compareTo(column2.name());
+ }
+ if(result > 0)
+ {
+ finalResult = 1;
+ }
+ if( result < 0 )
+ {
+ finalResult = -1;
+ }
+ return finalResult;
+ }
+}
+
+class ColumnNameComparator extends AbstractColumnComparator
+{
+ ColumnNameComparator()
+ {
+ super(ColumnComparatorFactory.ComparatorType.NAME);
+ }
+
+ /* if the names are the same then sort by time-stamps */
+ public int compare(IColumn column1, IColumn column2)
+ {
+ long result = column1.name().compareTo(column2.name());
+ int finalResult = 0;
+ if(result == 0)
+ {
+ /* inverse sort by time to get hte latest first */
+ result = column2.timestamp() - column1.timestamp();
+ }
+ if(result > 0)
+ {
+ finalResult = 1;
+ }
+ if( result < 0 )
+ {
+ finalResult = -1;
+ }
+ return finalResult;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ColumnFamily.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ColumnFamily.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ColumnFamily.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Proxy;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ColumnFamily implements Serializable
+{
+ private static ICompactSerializer2<ColumnFamily> serializer_;
+ public static final short utfPrefix_ = 2;
+ /* The column serializer for this Column Family. Create based on config. */
+
+ private static Logger logger_ = Logger.getLogger( ColumnFamily.class );
+ private static Map<String, String> columnTypes_ = new HashMap<String, String>();
+ private static Map<String, String> indexTypes_ = new HashMap<String, String>();
+
+ static
+ {
+ serializer_ = new ColumnFamilySerializer();
+ /* TODO: These are the various column types. Hard coded for now. */
+ columnTypes_.put("Standard", "Standard");
+ columnTypes_.put("Super", "Super");
+
+ indexTypes_.put("Name", "Name");
+ indexTypes_.put("Time", "Time");
+ }
+
+ public static ICompactSerializer2<ColumnFamily> serializer()
+ {
+ return serializer_;
+ }
+
+ /*
+ * This method returns the serializer whose methods are
+ * preprocessed by a dynamic proxy.
+ */
+ public static ICompactSerializer2<ColumnFamily> serializer2()
+ {
+ return (ICompactSerializer2<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer2.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
+ }
+
+ public static String getColumnType(String key)
+ {
+ if ( key == null )
+ return columnTypes_.get("Standard");
+ return columnTypes_.get(key);
+ }
+
+ public static String getColumnSortProperty(String columnIndexProperty)
+ {
+ if ( columnIndexProperty == null )
+ return indexTypes_.get("Time");
+ return indexTypes_.get(columnIndexProperty);
+ }
+
+ private transient AbstractColumnFactory columnFactory_;
+
+ private String name_;
+
+ private transient ICompactSerializer2<IColumn> columnSerializer_;
+ private transient AtomicBoolean isMarkedForDelete_;
+ private AtomicInteger size_ = new AtomicInteger(0);
+ private EfficientBidiMap columns_;
+
+ private Comparator<IColumn> columnComparator_;
+
+ private Comparator<IColumn> getColumnComparator(String cfName, String columnType)
+ {
+ if(columnComparator_ == null)
+ {
+ /*
+ * if this columnfamily has supercolumns or there is an index on the column name,
+ * then sort by name
+ */
+ if("Super".equals(columnType) || DatabaseDescriptor.isNameSortingEnabled(cfName))
+ {
+ columnComparator_ = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.NAME);
+ }
+ /* if this columnfamily has simple columns, and no index on name sort by timestamp */
+ else
+ {
+ columnComparator_ = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+ }
+ }
+
+ return columnComparator_;
+ }
+
+ ColumnFamily()
+ {
+ }
+
+ public ColumnFamily(String cf)
+ {
+ name_ = cf;
+ createColumnFactoryAndColumnSerializer();
+ }
+
+ public ColumnFamily(String cf, String columnType)
+ {
+ name_ = cf;
+ createColumnFactoryAndColumnSerializer(columnType);
+ }
+
+ void createColumnFactoryAndColumnSerializer(String columnType)
+ {
+ if ( columnFactory_ == null )
+ {
+ columnFactory_ = AbstractColumnFactory.getColumnFactory(columnType);
+ columnSerializer_ = columnFactory_.createColumnSerializer();
+ if(columns_ == null)
+ columns_ = new EfficientBidiMap(getColumnComparator(name_, columnType));
+ }
+ }
+
+ void createColumnFactoryAndColumnSerializer()
+ {
+ String columnType = DatabaseDescriptor.getColumnFamilyType(name_);
+ if ( columnType == null )
+ {
+ List<String> tables = DatabaseDescriptor.getTables();
+ if ( tables.size() > 0 )
+ {
+ String table = tables.get(0);
+ columnType = Table.open(table).getColumnFamilyType(name_);
+ }
+ }
+ createColumnFactoryAndColumnSerializer(columnType);
+ }
+
+ ColumnFamily cloneMe()
+ {
+ ColumnFamily cf = new ColumnFamily(name_);
+ cf.isMarkedForDelete_ = isMarkedForDelete_;
+ cf.columns_ = columns_.cloneMe();
+ return cf;
+ }
+
+ public String name()
+ {
+ return name_;
+ }
+
+ /**
+ * We need to go through each column
+ * in the column family and resolve it before adding
+ */
+ void addColumns(ColumnFamily cf)
+ {
+ Map<String, IColumn> columns = cf.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ addColumn(cName, columns.get(cName));
+ }
+ }
+
+ public ICompactSerializer2<IColumn> getColumnSerializer()
+ {
+ createColumnFactoryAndColumnSerializer();
+ return columnSerializer_;
+ }
+
+ public void createColumn(String name)
+ {
+ IColumn column = columnFactory_.createColumn(name);
+ addColumn(column.name(), column);
+ }
+
+ int getColumnCount()
+ {
+ int count = 0;
+ Map<String, IColumn> columns = columns_.getColumns();
+ if( columns != null )
+ {
+ if(!DatabaseDescriptor.getColumnType(name_).equals("Super"))
+ {
+ count = columns.size();
+ }
+ else
+ {
+ Collection<IColumn> values = columns.values();
+ for(IColumn column: values)
+ {
+ count += column.getObjectCount();
+ }
+ }
+ }
+ return count;
+ }
+
+ public void createColumn(String name, byte[] value)
+ {
+ IColumn column = columnFactory_.createColumn(name, value);
+ addColumn(column.name(), column);
+ }
+
+ public void createColumn(String name, byte[] value, long timestamp)
+ {
+ IColumn column = columnFactory_.createColumn(name, value, timestamp);
+ addColumn(column.name(), column);
+ }
+
+ void clear()
+ {
+ columns_.clear();
+ }
+
+ /*
+ * If we find an old column that has the same name
+ * the ask it to resolve itself else add the new column .
+ */
+ void addColumn(String name, IColumn column)
+ {
+ int newSize = 0;
+ IColumn oldColumn = columns_.get(name);
+ if ( oldColumn != null )
+ {
+ int oldSize = oldColumn.size();
+ if( oldColumn.putColumn(column))
+ {
+ // This will never be called for super column as put column always returns false.
+ columns_.put(name, column);
+ newSize = column.size();
+ }
+ else
+ {
+ newSize = oldColumn.size();
+ }
+ size_.addAndGet(newSize - oldSize);
+ }
+ else
+ {
+ newSize = column.size();
+ size_.addAndGet(newSize);
+ columns_.put(name, column);
+ }
+ }
+
+ public IColumn getColumn(String name)
+ {
+ return columns_.get( name );
+ }
+
+ public Collection<IColumn> getAllColumns()
+ {
+ return columns_.getSortedColumns();
+ }
+
+ Map<String, IColumn> getColumns()
+ {
+ return columns_.getColumns();
+ }
+
+ public void remove(String columnName)
+ {
+ columns_.remove(columnName);
+ }
+
+ void delete()
+ {
+ if ( isMarkedForDelete_ == null )
+ isMarkedForDelete_ = new AtomicBoolean(true);
+ else
+ isMarkedForDelete_.set(true);
+ }
+
+ boolean isMarkedForDelete()
+ {
+ return ( ( isMarkedForDelete_ == null ) ? false : isMarkedForDelete_.get() );
+ }
+
+ /*
+ * This is used as oldCf.merge(newCf). Basically we take the newCf
+ * and merge it into the oldCf.
+ */
+ void merge(ColumnFamily columnFamily)
+ {
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ columns_.put(cName, columns.get(cName));
+ }
+ }
+
+ /*
+ * This function will repair a list of columns
+ * If there are any columns in the external list which are not present
+ * internally then they are added ( this might have to change depending on
+ * how we implement delete repairs).
+ * Also if there are any columns in teh internal and not in the external
+ * they are kept intact.
+ * Else the one with the greatest timestamp is considered latest.
+ */
+ void repair(ColumnFamily columnFamily)
+ {
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ IColumn columnInternal = columns_.get(cName);
+ IColumn columnExternal = columns.get(cName);
+
+ if( columnInternal == null )
+ {
+ if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Super")))
+ {
+ columnInternal = new SuperColumn(columnExternal.name());
+ columns_.put(cName, columnInternal);
+ }
+ if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Standard")))
+ {
+ columnInternal = columnExternal;
+ columns_.put(cName, columnInternal);
+ }
+ }
+ columnInternal.repair(columnExternal);
+ }
+ }
+
+
+ /*
+ * This function will calculate the differnce between 2 column families
+ * the external input is considered the superset of internal
+ * so there are no deletes in the diff.
+ */
+ ColumnFamily diff(ColumnFamily columnFamily)
+ {
+ ColumnFamily cfDiff = new ColumnFamily(columnFamily.name());
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ IColumn columnInternal = columns_.get(cName);
+ IColumn columnExternal = columns.get(cName);
+ if( columnInternal == null )
+ {
+ cfDiff.addColumn(cName, columnExternal);
+ }
+ else
+ {
+ IColumn columnDiff = columnInternal.diff(columnExternal);
+ if(columnDiff != null)
+ {
+ cfDiff.addColumn(cName, columnDiff);
+ }
+ }
+ }
+ if(cfDiff.getColumns().size() != 0)
+ return cfDiff;
+ else
+ return null;
+ }
+
+ int size()
+ {
+ if ( size_.get() == 0 )
+ {
+ Set<String> cNames = columns_.getColumns().keySet();
+ for ( String cName : cNames )
+ {
+ size_.addAndGet(columns_.get(cName).size());
+ }
+ }
+ return size_.get();
+ }
+
+ public int hashCode()
+ {
+ return name().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof ColumnFamily) )
+ return false;
+ ColumnFamily cf = (ColumnFamily)o;
+ return name().equals(cf.name());
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ Collection<IColumn> columns = getAllColumns();
+ sb.append(columns.size());
+ sb.append(":");
+
+ for ( IColumn column : columns )
+ {
+ sb.append(column.toString());
+ }
+ sb.append(":");
+ return sb.toString();
+ }
+
+ public byte[] digest()
+ {
+ Set<IColumn> columns = columns_.getSortedColumns();
+ byte[] xorHash = new byte[0];
+ byte[] tmpHash = new byte[0];
+ for(IColumn column : columns)
+ {
+ if(xorHash.length == 0)
+ {
+ xorHash = column.digest();
+ }
+ else
+ {
+ tmpHash = column.digest();
+ xorHash = FBUtilities.xor(xorHash, tmpHash);
+ }
+ }
+ return xorHash;
+ }
+}
+
+class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+{
+ /*
+ * We are going to create indexes, and write out that information as well. The format
+ * of the data serialized is as follows.
+ *
+ * 1) Without indexes:
+ * // written by the data
+ * <boolean false (index is not present)>
+ * <column family id>
+ * <is marked for delete>
+ * <total number of columns>
+ * <columns data>
+
+ * <boolean true (index is present)>
+ *
+ * This part is written by the column indexer
+ * <size of index in bytes>
+ * <list of column names and their offsets relative to the first column>
+ *
+ * <size of the cf in bytes>
+ * <column family id>
+ * <is marked for delete>
+ * <total number of columns>
+ * <columns data>
+ */
+ public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+
+ /* write the column family id */
+ dos.writeUTF(columnFamily.name());
+ /* write if this cf is marked for delete */
+ dos.writeBoolean(columnFamily.isMarkedForDelete());
+ /* write the size is the number of columns */
+ dos.writeInt(columns.size());
+
+ /* write the column data */
+ for ( IColumn column : columns )
+ {
+ columnFamily.getColumnSerializer().serialize(column, dos);
+ }
+ }
+
+ /*
+ * Use this method to create a bare bones Column Family. This column family
+ * does not have any of the Column information.
+ */
+ private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
+ {
+ String name = dis.readUTF();
+ boolean delete = dis.readBoolean();
+ ColumnFamily cf = new ColumnFamily(name);
+ if ( delete )
+ cf.delete();
+ return cf;
+ }
+
+ /*
+ * This method fills the Column Family object with the column information
+ * from the DataInputStream. The "items" parameter tells us whether we need
+ * all the columns or just a subset of all the Columns that make up the
+ * Column Family. If "items" is -1 then we need all the columns if not we
+ * deserialize only as many columns as indicated by the "items" parameter.
+ */
+ private void fillColumnFamily(ColumnFamily cf, DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
+ {
+ column = cf.getColumnSerializer().deserialize(dis);
+ if(column != null)
+ {
+ cf.addColumn(column.name(), column);
+ }
+ }
+ }
+
+ public ColumnFamily deserialize(DataInputStream dis) throws IOException
+ {
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ fillColumnFamily(cf,dis);
+ return cf;
+ }
+
+ /*
+ * This version of deserialize is used when we need a specific set if columns for
+ * a column family specified in the name cfName parameter.
+ */
+ public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
+ {
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ {
+ int size = dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
+ {
+ column = cf.getColumnSerializer().deserialize(dis, filter);
+ if(column != null)
+ {
+ cf.addColumn(column.name(), column);
+ column = null;
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
+ }
+ }
+ return cf;
+ }
+
+ /*
+ * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
+ * name could be of the form cf:superColumn:column or cf:column or cf
+ */
+ public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+ {
+ String[] names = RowMutation.getColumnAndColumnFamily(name);
+ String columnName = "";
+ if ( names.length == 1 )
+ return deserialize(dis, filter);
+ if( names.length == 2 )
+ columnName = names[1];
+ if( names.length == 3 )
+ columnName = names[1]+ ":" + names[2];
+
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ {
+ /* read the number of columns */
+ int size = dis.readInt();
+ for ( int i = 0; i < size; ++i )
+ {
+ IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
+ if ( column != null )
+ {
+ cf.addColumn(column.name(), column);
+ break;
+ }
+ }
+ }
+ return cf;
+ }
+
+ public void skip(DataInputStream dis) throws IOException
+ {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnFamilyNotDefinedException extends Exception
+{
+ public ColumnFamilyNotDefinedException(String message)
+ {
+ super(message);
+ }
+}