You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [8/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/j...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,59 +1,59 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.*;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.service.*;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.net.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ReadRepairVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger(ReadRepairVerbHandler.class);
-
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- DataInputBuffer buffer = new DataInputBuffer();
- buffer.reset(body, body.length);
-
- try
- {
- RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
- RowMutation rm = rmMsg.getRowMutation();
- rm.apply();
- }
- catch ( IOException e )
- {
- if (logger_.isDebugEnabled())
- logger_.debug(LogUtil.throwableToString(e));
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadRepairVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(ReadRepairVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(body, body.length);
+
+ try
+ {
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+ RowMutation rm = rmMsg.getRowMutation();
+ rm.apply();
+ }
+ catch ( IOException e )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Thu Jul 30 15:30:21 2009
@@ -1,137 +1,137 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.commons.lang.ArrayUtils;
-
-
-/*
- * The read response message is sent by the server when reading data
- * this encapsulates the tablename and the row that has been read.
- * The table name is needed so that we can use it to create repairs.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class ReadResponse implements Serializable
-{
-private static ICompactSerializer<ReadResponse> serializer_;
-
- static
- {
- serializer_ = new ReadResponseSerializer();
- }
-
- public static ICompactSerializer<ReadResponse> serializer()
- {
- return serializer_;
- }
-
- public static Message makeReadResponseMessage(ReadResponse readResponse) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- ReadResponse.serializer().serialize(readResponse, dos);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
- return message;
- }
-
- private Row row_;
- private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
- private boolean isDigestQuery_ = false;
-
- public ReadResponse(byte[] digest )
- {
- assert digest != null;
- digest_= digest;
- }
-
- public ReadResponse(Row row)
- {
- row_ = row;
- }
-
- public Row row()
- {
- return row_;
- }
-
- public byte[] digest()
- {
- return digest_;
- }
-
- public boolean isDigestQuery()
- {
- return isDigestQuery_;
- }
-
- public void setIsDigestQuery(boolean isDigestQuery)
- {
- isDigestQuery_ = isDigestQuery;
- }
-}
-
-class ReadResponseSerializer implements ICompactSerializer<ReadResponse>
-{
- public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
- {
- dos.writeInt(rm.digest().length);
- dos.write(rm.digest());
- dos.writeBoolean(rm.isDigestQuery());
-
- if( !rm.isDigestQuery() && rm.row() != null )
- {
- Row.serializer().serialize(rm.row(), dos);
- }
- }
-
- public ReadResponse deserialize(DataInputStream dis) throws IOException
- {
- int digestSize = dis.readInt();
- byte[] digest = new byte[digestSize];
- dis.read(digest, 0 , digestSize);
- boolean isDigest = dis.readBoolean();
-
- Row row = null;
- if ( !isDigest )
- {
- row = Row.serializer().deserialize(dis);
- }
-
- ReadResponse rmsg = null;
- if( isDigest )
- {
- rmsg = new ReadResponse(digest);
- }
- else
- {
- rmsg = new ReadResponse(row);
- }
- rmsg.setIsDigestQuery(isDigest);
- return rmsg;
- }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.commons.lang.ArrayUtils;
+
+
+/*
+ * The read response message is sent by the server when reading data
+ * this encapsulates the tablename and the row that has been read.
+ * The table name is needed so that we can use it to create repairs.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponse implements Serializable
+{
+private static ICompactSerializer<ReadResponse> serializer_;
+
+ static
+ {
+ serializer_ = new ReadResponseSerializer();
+ }
+
+ public static ICompactSerializer<ReadResponse> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeReadResponseMessage(ReadResponse readResponse) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ ReadResponse.serializer().serialize(readResponse, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
+ return message;
+ }
+
+ private Row row_;
+ private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
+ private boolean isDigestQuery_ = false;
+
+ public ReadResponse(byte[] digest )
+ {
+ assert digest != null;
+ digest_= digest;
+ }
+
+ public ReadResponse(Row row)
+ {
+ row_ = row;
+ }
+
+ public Row row()
+ {
+ return row_;
+ }
+
+ public byte[] digest()
+ {
+ return digest_;
+ }
+
+ public boolean isDigestQuery()
+ {
+ return isDigestQuery_;
+ }
+
+ public void setIsDigestQuery(boolean isDigestQuery)
+ {
+ isDigestQuery_ = isDigestQuery;
+ }
+}
+
+class ReadResponseSerializer implements ICompactSerializer<ReadResponse>
+{
+ public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeInt(rm.digest().length);
+ dos.write(rm.digest());
+ dos.writeBoolean(rm.isDigestQuery());
+
+ if( !rm.isDigestQuery() && rm.row() != null )
+ {
+ Row.serializer().serialize(rm.row(), dos);
+ }
+ }
+
+ public ReadResponse deserialize(DataInputStream dis) throws IOException
+ {
+ int digestSize = dis.readInt();
+ byte[] digest = new byte[digestSize];
+ dis.read(digest, 0 , digestSize);
+ boolean isDigest = dis.readBoolean();
+
+ Row row = null;
+ if ( !isDigest )
+ {
+ row = Row.serializer().deserialize(dis);
+ }
+
+ ReadResponse rmsg = null;
+ if( isDigest )
+ {
+ rmsg = new ReadResponse(digest);
+ }
+ else
+ {
+ rmsg = new ReadResponse(row);
+ }
+ rmsg.setIsDigestQuery(isDigest);
+ return rmsg;
+ }
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,123 +1,123 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ReadVerbHandler implements IVerbHandler
-{
- protected static class ReadContext
- {
- protected DataInputBuffer bufIn_ = new DataInputBuffer();
- protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
- }
-
- private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
- /* We use this so that we can reuse the same row mutation context for the mutation. */
- private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
-
- protected static ReadVerbHandler.ReadContext getCurrentReadContext()
- {
- return tls_.get();
- }
-
- protected static void setCurrentReadContext(ReadVerbHandler.ReadContext readContext)
- {
- tls_.set(readContext);
- }
-
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- /* Obtain a Read Context from TLS */
- ReadContext readCtx = tls_.get();
- if ( readCtx == null )
- {
- readCtx = new ReadContext();
- tls_.set(readCtx);
- }
- readCtx.bufIn_.reset(body, body.length);
-
- try
- {
- ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
- Table table = Table.open(readCommand.table);
- Row row = null;
- row = readCommand.getRow(table);
- ReadResponse readResponse = null;
- if (readCommand.isDigestQuery())
- {
- readResponse = new ReadResponse(row.digest());
- }
- else
- {
- readResponse = new ReadResponse(row);
- }
- readResponse.setIsDigestQuery(readCommand.isDigestQuery());
- /* serialize the ReadResponseMessage. */
- readCtx.bufOut_.reset();
-
- ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
-
- byte[] bytes = new byte[readCtx.bufOut_.getLength()];
- System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
-
- Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
- if (logger_.isDebugEnabled())
- logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
-
- /* Do read repair if header of the message says so */
- if (message.getHeader(ReadCommand.DO_REPAIR) != null)
- {
- doReadRepair(row, readCommand);
- }
- }
- catch (IOException ex)
- {
- throw new RuntimeException(ex);
- }
- }
-
- private void doReadRepair(Row row, ReadCommand readCommand)
- {
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
-
- if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadVerbHandler implements IVerbHandler
+{
+ protected static class ReadContext
+ {
+ protected DataInputBuffer bufIn_ = new DataInputBuffer();
+ protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
+ }
+
+ private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
+
+ protected static ReadVerbHandler.ReadContext getCurrentReadContext()
+ {
+ return tls_.get();
+ }
+
+ protected static void setCurrentReadContext(ReadVerbHandler.ReadContext readContext)
+ {
+ tls_.set(readContext);
+ }
+
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ /* Obtain a Read Context from TLS */
+ ReadContext readCtx = tls_.get();
+ if ( readCtx == null )
+ {
+ readCtx = new ReadContext();
+ tls_.set(readCtx);
+ }
+ readCtx.bufIn_.reset(body, body.length);
+
+ try
+ {
+ ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(readCommand.table);
+ Row row = null;
+ row = readCommand.getRow(table);
+ ReadResponse readResponse = null;
+ if (readCommand.isDigestQuery())
+ {
+ readResponse = new ReadResponse(row.digest());
+ }
+ else
+ {
+ readResponse = new ReadResponse(row);
+ }
+ readResponse.setIsDigestQuery(readCommand.isDigestQuery());
+ /* serialize the ReadResponseMessage. */
+ readCtx.bufOut_.reset();
+
+ ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
+
+ byte[] bytes = new byte[readCtx.bufOut_.getLength()];
+ System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
+
+ Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+
+ /* Do read repair if header of the message says so */
+ if (message.getHeader(ReadCommand.DO_REPAIR) != null)
+ {
+ doReadRepair(row, readCommand);
+ }
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void doReadRepair(Row row, ReadCommand readCommand)
+ {
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+
+ if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java Thu Jul 30 15:30:21 2009
@@ -1,66 +1,66 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.*;
-import java.io.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RecoveryManager
-{
- private static RecoveryManager instance_;
- private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
-
- public synchronized static RecoveryManager instance() throws IOException
- {
- if (instance_ == null)
- {
- instance_ = new RecoveryManager();
- }
- return instance_;
- }
-
- public static File[] getListofCommitLogs()
- {
- String directory = DatabaseDescriptor.getLogFileLocation();
- File file = new File(directory);
- return file.listFiles();
- }
-
- public static void doRecovery() throws IOException
- {
- File[] files = getListofCommitLogs();
- if (files.length == 0)
- return;
-
- Arrays.sort(files, new FileUtils.FileComparator());
- logger_.info("Replaying " + StringUtils.join(files, ", "));
- new CommitLog(true).recover(files);
- FileUtils.delete(files);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RecoveryManager
+{
+ private static RecoveryManager instance_;
+ private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
+
+ public synchronized static RecoveryManager instance() throws IOException
+ {
+ if (instance_ == null)
+ {
+ instance_ = new RecoveryManager();
+ }
+ return instance_;
+ }
+
+ public static File[] getListofCommitLogs()
+ {
+ String directory = DatabaseDescriptor.getLogFileLocation();
+ File file = new File(directory);
+ return file.listFiles();
+ }
+
+ public static void doRecovery() throws IOException
+ {
+ File[] files = getListofCommitLogs();
+ if (files.length == 0)
+ return;
+
+ Arrays.sort(files, new FileUtils.FileComparator());
+ logger_.info("Replaying " + StringUtils.join(files, ", "));
+ new CommitLog(true).recover(files);
+ FileUtils.delete(files);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Thu Jul 30 15:30:21 2009
@@ -1,231 +1,231 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.Arrays;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class Row
-{
- private static Logger logger_ = Logger.getLogger(Row.class);
- private String table_;
- private static RowSerializer serializer = new RowSerializer();
-
- static RowSerializer serializer()
- {
- return serializer;
- }
-
- public Row(String table, String key) {
- assert table != null;
- this.table_ = table;
- this.key_ = key;
- }
-
- // only for use by RMVH
- Row()
- {
- }
-
- public String getTable() {
- return table_;
- }
-
- private String key_;
-
- private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
-
- public String key()
- {
- return key_;
- }
-
- void setKey(String key)
- {
- key_ = key;
- }
-
- public void setTable(String table)
- {
- table_ = table;
- }
-
- public Set<String> getColumnFamilyNames()
- {
- return columnFamilies_.keySet();
- }
-
- public Collection<ColumnFamily> getColumnFamilies()
- {
- return columnFamilies_.values();
- }
-
- public ColumnFamily getColumnFamily(String cfName)
- {
- return columnFamilies_.get(cfName);
- }
-
- void addColumnFamily(ColumnFamily columnFamily)
- {
- columnFamilies_.put(columnFamily.name(), columnFamily);
- }
-
- void removeColumnFamily(ColumnFamily columnFamily)
- {
- columnFamilies_.remove(columnFamily.name());
- int delta = (-1) * columnFamily.size();
- }
-
- public boolean isEmpty()
- {
- return (columnFamilies_.size() == 0);
- }
-
- /*
- * This function will repair the current row with the input row
- * what that means is that if there are any differences between the 2 rows then
- * this function will make the current row take the latest changes.
- */
- public void repair(Row rowOther)
- {
- for (ColumnFamily cfOld : rowOther.getColumnFamilies())
- {
- ColumnFamily cf = columnFamilies_.get(cfOld.name());
- if (cf == null)
- {
- addColumnFamily(cfOld);
- }
- else
- {
- columnFamilies_.remove(cf.name());
- addColumnFamily(ColumnFamily.resolve(Arrays.asList(cfOld, cf)));
- }
- }
- }
-
- /*
- * This function will calculate the difference between 2 rows
- * and return the resultant row. This assumes that the row that
- * is being submitted is a super set of the current row so
- * it only calculates additional
- * difference and does not take care of what needs to be removed from the current row to make
- * it same as the input row.
- */
- public Row diff(Row rowComposite)
- {
- Row rowDiff = new Row(table_, key_);
-
- for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
- {
- ColumnFamily cf = columnFamilies_.get(cfComposite.name());
- if (cf == null)
- rowDiff.addColumnFamily(cfComposite);
- else
- {
- ColumnFamily cfDiff = cf.diff(cfComposite);
- if (cfDiff != null)
- rowDiff.addColumnFamily(cfDiff);
- }
- }
- if (rowDiff.getColumnFamilies().isEmpty())
- return null;
- else
- return rowDiff;
- }
-
- public Row cloneMe()
- {
- Row row = new Row(table_, key_);
- row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
- return row;
- }
-
- public byte[] digest()
- {
- Set<String> cfamilies = columnFamilies_.keySet();
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
- for (String cFamily : cfamilies)
- {
- if (xorHash.length == 0)
- {
- xorHash = columnFamilies_.get(cFamily).digest();
- }
- else
- {
- xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
- }
- }
- return xorHash;
- }
-
- void clear()
- {
- columnFamilies_.clear();
- }
-
- public String toString()
- {
- return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
- }
-}
-
-class RowSerializer implements ICompactSerializer<Row>
-{
- public void serialize(Row row, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(row.getTable());
- dos.writeUTF(row.key());
- Collection<ColumnFamily> columnFamilies = row.getColumnFamilies();
- int size = columnFamilies.size();
- dos.writeInt(size);
-
- for (ColumnFamily cf : columnFamilies)
- {
- ColumnFamily.serializer().serialize(cf, dos);
- }
- }
-
- public Row deserialize(DataInputStream dis) throws IOException
- {
- String table = dis.readUTF();
- String key = dis.readUTF();
- Row row = new Row(table, key);
- int size = dis.readInt();
-
- for (int i = 0; i < size; ++i)
- {
- ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
- row.addColumnFamily(cf);
- }
- return row;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Arrays;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class Row
+{
+ private static Logger logger_ = Logger.getLogger(Row.class);
+ private String table_;
+ private static RowSerializer serializer = new RowSerializer();
+
+ static RowSerializer serializer()
+ {
+ return serializer;
+ }
+
+ public Row(String table, String key) {
+ assert table != null;
+ this.table_ = table;
+ this.key_ = key;
+ }
+
+ // only for use by RMVH
+ Row()
+ {
+ }
+
+ public String getTable() {
+ return table_;
+ }
+
+ private String key_;
+
+ private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
+
+ public String key()
+ {
+ return key_;
+ }
+
+ void setKey(String key)
+ {
+ key_ = key;
+ }
+
+ public void setTable(String table)
+ {
+ table_ = table;
+ }
+
+ public Set<String> getColumnFamilyNames()
+ {
+ return columnFamilies_.keySet();
+ }
+
+ public Collection<ColumnFamily> getColumnFamilies()
+ {
+ return columnFamilies_.values();
+ }
+
+ public ColumnFamily getColumnFamily(String cfName)
+ {
+ return columnFamilies_.get(cfName);
+ }
+
+ void addColumnFamily(ColumnFamily columnFamily)
+ {
+ columnFamilies_.put(columnFamily.name(), columnFamily);
+ }
+
+ void removeColumnFamily(ColumnFamily columnFamily)
+ {
+ columnFamilies_.remove(columnFamily.name());
+ int delta = (-1) * columnFamily.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return (columnFamilies_.size() == 0);
+ }
+
+ /*
+ * This function will repair the current row with the input row
+ * what that means is that if there are any differences between the 2 rows then
+ * this function will make the current row take the latest changes.
+ */
+ public void repair(Row rowOther)
+ {
+ for (ColumnFamily cfOld : rowOther.getColumnFamilies())
+ {
+ ColumnFamily cf = columnFamilies_.get(cfOld.name());
+ if (cf == null)
+ {
+ addColumnFamily(cfOld);
+ }
+ else
+ {
+ columnFamilies_.remove(cf.name());
+ addColumnFamily(ColumnFamily.resolve(Arrays.asList(cfOld, cf)));
+ }
+ }
+ }
+
+ /*
+ * This function will calculate the difference between 2 rows
+ * and return the resultant row. This assumes that the row that
+ * is being submitted is a super set of the current row so
+ * it only calculates additional
+ * difference and does not take care of what needs to be removed from the current row to make
+ * it same as the input row.
+ */
+ public Row diff(Row rowComposite)
+ {
+ Row rowDiff = new Row(table_, key_);
+
+ for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
+ {
+ ColumnFamily cf = columnFamilies_.get(cfComposite.name());
+ if (cf == null)
+ rowDiff.addColumnFamily(cfComposite);
+ else
+ {
+ ColumnFamily cfDiff = cf.diff(cfComposite);
+ if (cfDiff != null)
+ rowDiff.addColumnFamily(cfDiff);
+ }
+ }
+ if (rowDiff.getColumnFamilies().isEmpty())
+ return null;
+ else
+ return rowDiff;
+ }
+
+ public Row cloneMe()
+ {
+ Row row = new Row(table_, key_);
+ row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
+ return row;
+ }
+
+ public byte[] digest()
+ {
+ Set<String> cfamilies = columnFamilies_.keySet();
+ byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
+ for (String cFamily : cfamilies)
+ {
+ if (xorHash.length == 0)
+ {
+ xorHash = columnFamilies_.get(cFamily).digest();
+ }
+ else
+ {
+ xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
+ }
+ }
+ return xorHash;
+ }
+
+ void clear()
+ {
+ columnFamilies_.clear();
+ }
+
+ public String toString()
+ {
+ return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
+ }
+}
+
+class RowSerializer implements ICompactSerializer<Row>
+{
+ public void serialize(Row row, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(row.getTable());
+ dos.writeUTF(row.key());
+ Collection<ColumnFamily> columnFamilies = row.getColumnFamilies();
+ int size = columnFamilies.size();
+ dos.writeInt(size);
+
+ for (ColumnFamily cf : columnFamilies)
+ {
+ ColumnFamily.serializer().serialize(cf, dos);
+ }
+ }
+
+ public Row deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ Row row = new Row(table, key);
+ int size = dis.readInt();
+
+ for (int i = 0; i < size; ++i)
+ {
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+ row.addColumnFamily(cf);
+ }
+ return row;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jul 30 15:30:21 2009
@@ -1,345 +1,345 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.nio.ByteBuffer;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.BatchMutationSuper;
-import org.apache.cassandra.service.BatchMutation;
-import org.apache.cassandra.service.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RowMutation implements Serializable
-{
- private static ICompactSerializer<RowMutation> serializer_;
- public static final String HINT = "HINT";
-
- static
- {
- serializer_ = new RowMutationSerializer();
- }
-
- static ICompactSerializer<RowMutation> serializer()
- {
- return serializer_;
- }
-
- private String table_;
- private String key_;
- protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
-
- public RowMutation(String table, String key)
- {
- table_ = table;
- key_ = key;
- }
-
- public RowMutation(String table, Row row)
- {
- table_ = table;
- key_ = row.key();
- for (ColumnFamily cf : row.getColumnFamilies())
- {
- add(cf);
- }
- }
-
- protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
- {
- table_ = table;
- key_ = key;
- modifications_ = modifications;
- }
-
- public String table()
- {
- return table_;
- }
-
- public String key()
- {
- return key_;
- }
-
- public Set<String> columnFamilyNames()
- {
- return modifications_.keySet();
- }
-
- void addHints(String key, String host) throws IOException
- {
- QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
- add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
- }
-
- /*
- * Specify a column family name and the corresponding column
- * family object.
- * param @ cf - column family name
- * param @ columnFamily - the column family.
- */
- public void add(ColumnFamily columnFamily)
- {
- if (modifications_.containsKey(columnFamily.name()))
- {
- throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
- }
- modifications_.put(columnFamily.name(), columnFamily);
- }
-
- /*
- * Specify a column name and a corresponding value for
- * the column. Column name is specified as <column family>:column.
- * This will result in a ColumnFamily associated with
- * <column family> as name and a Column with <column>
- * as name. The column can be further broken up
- * as super column name : columnname in case of super columns
- *
- * param @ cf - column name as <column family>:<column>
- * param @ value - value associated with the column
- * param @ timestamp - timestamp associated with this data.
- */
- public void add(QueryPath path, byte[] value, long timestamp)
- {
- ColumnFamily columnFamily = modifications_.get(path.columnFamilyName);
- if (columnFamily == null)
- {
- columnFamily = ColumnFamily.create(table_, path.columnFamilyName);
- }
- columnFamily.addColumn(path, value, timestamp);
- modifications_.put(path.columnFamilyName, columnFamily);
- }
-
- public void delete(QueryPath path, long timestamp)
- {
- assert path.columnFamilyName != null;
- String cfName = path.columnFamilyName;
-
- if (modifications_.containsKey(cfName))
- {
- throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
- }
-
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
-
- ColumnFamily columnFamily = modifications_.get(cfName);
- if (columnFamily == null)
- columnFamily = ColumnFamily.create(table_, cfName);
-
- if (path.superColumnName == null && path.columnName == null)
- {
- columnFamily.delete(localDeleteTime, timestamp);
- }
- else if (path.columnName == null)
- {
- SuperColumn sc = new SuperColumn(path.superColumnName, DatabaseDescriptor.getSubComparator(table_, cfName));
- sc.markForDeleteAt(localDeleteTime, timestamp);
- columnFamily.addColumn(sc);
- }
- else
- {
- ByteBuffer bytes = ByteBuffer.allocate(4);
- bytes.putInt(localDeleteTime);
- columnFamily.addColumn(path, bytes.array(), timestamp, true);
- }
-
- modifications_.put(cfName, columnFamily);
- }
-
- /*
- * This is equivalent to calling commit. Applies the changes to
- * to the table that is obtained by calling Table.open().
- */
- public void apply() throws IOException
- {
- Row row = new Row(table_, key_);
- apply(row);
- }
-
- /*
- * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
- */
- void apply(Row emptyRow) throws IOException
- {
- assert emptyRow.getColumnFamilies().size() == 0;
- Table table = Table.open(table_);
- for (String cfName : modifications_.keySet())
- {
- assert table.isValidColumnFamily(cfName);
- emptyRow.addColumnFamily(modifications_.get(cfName));
- }
- table.apply(emptyRow);
- }
-
- /*
- * This is equivalent to calling commit. Applies the changes to
- * to the table that is obtained by calling Table.open().
- */
- void applyBinary(Row emptyRow) throws IOException, ExecutionException, InterruptedException
- {
- assert emptyRow.getColumnFamilies().size() == 0;
- Table table = Table.open(table_);
- Set<String> cfNames = modifications_.keySet();
- for (String cfName : cfNames)
- {
- assert table.isValidColumnFamily(cfName);
- emptyRow.addColumnFamily(modifications_.get(cfName));
- }
- table.load(emptyRow);
- }
-
- public Message makeRowMutationMessage() throws IOException
- {
- return makeRowMutationMessage(StorageService.mutationVerbHandler_);
- }
-
- public Message makeRowMutationMessage(String verbHandlerName) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- serializer().serialize(this, dos);
- EndPoint local = StorageService.getLocalStorageEndPoint();
- EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
- return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
- }
-
- public static RowMutation getRowMutation(String table, BatchMutation batchMutation)
- {
- RowMutation rm = new RowMutation(table, batchMutation.key.trim());
- for (String cfname : batchMutation.cfmap.keySet())
- {
- List<org.apache.cassandra.service.Column> list = batchMutation.cfmap.get(cfname);
- for (org.apache.cassandra.service.Column column : list)
- {
- rm.add(new QueryPath(cfname, null, column.name), column.value, column.timestamp);
- }
- }
- return rm;
- }
-
- public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper) throws InvalidRequestException
- {
- RowMutation rm = new RowMutation(table, batchMutationSuper.key.trim());
- for (String cfName : batchMutationSuper.cfmap.keySet())
- {
- for (org.apache.cassandra.service.SuperColumn super_column : batchMutationSuper.cfmap.get(cfName))
- {
- for (org.apache.cassandra.service.Column column : super_column.columns)
- {
- try
- {
- rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
- }
- catch (MarshalException e)
- {
- throw new InvalidRequestException(e.getMessage());
- }
- }
- }
- }
- return rm;
- }
-
- public String toString()
- {
- return "RowMutation(" +
- "table='" + table_ + '\'' +
- ", key='" + key_ + '\'' +
- ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
- ')';
- }
-}
-
-class RowMutationSerializer implements ICompactSerializer<RowMutation>
-{
- private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
- {
- int size = map.size();
- dos.writeInt(size);
- if (size > 0)
- {
- Set<String> keys = map.keySet();
- for (String key : keys)
- {
- dos.writeUTF(key);
- ColumnFamily cf = map.get(key);
- if (cf != null)
- {
- ColumnFamily.serializer().serialize(cf, dos);
- }
- }
- }
- }
-
- public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(rm.table());
- dos.writeUTF(rm.key());
-
- /* serialize the modifications_ in the mutation */
- freezeTheMaps(rm.modifications_, dos);
- }
-
- private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
- {
- Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
- int size = dis.readInt();
- for (int i = 0; i < size; ++i)
- {
- String key = dis.readUTF();
- ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
- map.put(key, cf);
- }
- return map;
- }
-
- public RowMutation deserialize(DataInputStream dis) throws IOException
- {
- String table = dis.readUTF();
- String key = dis.readUTF();
- Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
- return new RowMutation(table, key, modifications);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.BatchMutationSuper;
+import org.apache.cassandra.service.BatchMutation;
+import org.apache.cassandra.service.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutation implements Serializable
+{
+ private static ICompactSerializer<RowMutation> serializer_;
+ public static final String HINT = "HINT";
+
+ static
+ {
+ serializer_ = new RowMutationSerializer();
+ }
+
+ static ICompactSerializer<RowMutation> serializer()
+ {
+ return serializer_;
+ }
+
+ private String table_;
+ private String key_;
+ protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
+
+ public RowMutation(String table, String key)
+ {
+ table_ = table;
+ key_ = key;
+ }
+
+ public RowMutation(String table, Row row)
+ {
+ table_ = table;
+ key_ = row.key();
+ for (ColumnFamily cf : row.getColumnFamilies())
+ {
+ add(cf);
+ }
+ }
+
+ protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
+ {
+ table_ = table;
+ key_ = key;
+ modifications_ = modifications;
+ }
+
+ public String table()
+ {
+ return table_;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ public Set<String> columnFamilyNames()
+ {
+ return modifications_.keySet();
+ }
+
+ void addHints(String key, String host) throws IOException
+ {
+ QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
+ add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
+ }
+
+ /*
+ * Specify a column family name and the corresponding column
+ * family object.
+ * param @ cf - column family name
+ * param @ columnFamily - the column family.
+ */
+ public void add(ColumnFamily columnFamily)
+ {
+ if (modifications_.containsKey(columnFamily.name()))
+ {
+ throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
+ }
+ modifications_.put(columnFamily.name(), columnFamily);
+ }
+
+ /*
+ * Specify a column name and a corresponding value for
+ * the column. Column name is specified as <column family>:column.
+ * This will result in a ColumnFamily associated with
+ * <column family> as name and a Column with <column>
+ * as name. The column can be further broken up
+ * as super column name : columnname in case of super columns
+ *
+ * param @ cf - column name as <column family>:<column>
+ * param @ value - value associated with the column
+ * param @ timestamp - timestamp associated with this data.
+ */
+ public void add(QueryPath path, byte[] value, long timestamp)
+ {
+ ColumnFamily columnFamily = modifications_.get(path.columnFamilyName);
+ if (columnFamily == null)
+ {
+ columnFamily = ColumnFamily.create(table_, path.columnFamilyName);
+ }
+ columnFamily.addColumn(path, value, timestamp);
+ modifications_.put(path.columnFamilyName, columnFamily);
+ }
+
+ public void delete(QueryPath path, long timestamp)
+ {
+ assert path.columnFamilyName != null;
+ String cfName = path.columnFamilyName;
+
+ if (modifications_.containsKey(cfName))
+ {
+ throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
+ }
+
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+
+ ColumnFamily columnFamily = modifications_.get(cfName);
+ if (columnFamily == null)
+ columnFamily = ColumnFamily.create(table_, cfName);
+
+ if (path.superColumnName == null && path.columnName == null)
+ {
+ columnFamily.delete(localDeleteTime, timestamp);
+ }
+ else if (path.columnName == null)
+ {
+ SuperColumn sc = new SuperColumn(path.superColumnName, DatabaseDescriptor.getSubComparator(table_, cfName));
+ sc.markForDeleteAt(localDeleteTime, timestamp);
+ columnFamily.addColumn(sc);
+ }
+ else
+ {
+ ByteBuffer bytes = ByteBuffer.allocate(4);
+ bytes.putInt(localDeleteTime);
+ columnFamily.addColumn(path, bytes.array(), timestamp, true);
+ }
+
+ modifications_.put(cfName, columnFamily);
+ }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the table that is obtained by calling Table.open().
+ */
+ public void apply() throws IOException
+ {
+ Row row = new Row(table_, key_);
+ apply(row);
+ }
+
+ /*
+ * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
+ */
+ void apply(Row emptyRow) throws IOException
+ {
+ assert emptyRow.getColumnFamilies().size() == 0;
+ Table table = Table.open(table_);
+ for (String cfName : modifications_.keySet())
+ {
+ assert table.isValidColumnFamily(cfName);
+ emptyRow.addColumnFamily(modifications_.get(cfName));
+ }
+ table.apply(emptyRow);
+ }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the table that is obtained by calling Table.open().
+ */
+ void applyBinary(Row emptyRow) throws IOException, ExecutionException, InterruptedException
+ {
+ assert emptyRow.getColumnFamilies().size() == 0;
+ Table table = Table.open(table_);
+ Set<String> cfNames = modifications_.keySet();
+ for (String cfName : cfNames)
+ {
+ assert table.isValidColumnFamily(cfName);
+ emptyRow.addColumnFamily(modifications_.get(cfName));
+ }
+ table.load(emptyRow);
+ }
+
+ public Message makeRowMutationMessage() throws IOException
+ {
+ return makeRowMutationMessage(StorageService.mutationVerbHandler_);
+ }
+
+ public Message makeRowMutationMessage(String verbHandlerName) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ serializer().serialize(this, dos);
+ EndPoint local = StorageService.getLocalStorageEndPoint();
+ EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
+ return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ }
+
+ public static RowMutation getRowMutation(String table, BatchMutation batchMutation)
+ {
+ RowMutation rm = new RowMutation(table, batchMutation.key.trim());
+ for (String cfname : batchMutation.cfmap.keySet())
+ {
+ List<org.apache.cassandra.service.Column> list = batchMutation.cfmap.get(cfname);
+ for (org.apache.cassandra.service.Column column : list)
+ {
+ rm.add(new QueryPath(cfname, null, column.name), column.value, column.timestamp);
+ }
+ }
+ return rm;
+ }
+
+ public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper) throws InvalidRequestException
+ {
+ RowMutation rm = new RowMutation(table, batchMutationSuper.key.trim());
+ for (String cfName : batchMutationSuper.cfmap.keySet())
+ {
+ for (org.apache.cassandra.service.SuperColumn super_column : batchMutationSuper.cfmap.get(cfName))
+ {
+ for (org.apache.cassandra.service.Column column : super_column.columns)
+ {
+ try
+ {
+ rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
+ }
+ catch (MarshalException e)
+ {
+ throw new InvalidRequestException(e.getMessage());
+ }
+ }
+ }
+ }
+ return rm;
+ }
+
+ public String toString()
+ {
+ return "RowMutation(" +
+ "table='" + table_ + '\'' +
+ ", key='" + key_ + '\'' +
+ ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
+ ')';
+ }
+}
+
+class RowMutationSerializer implements ICompactSerializer<RowMutation>
+{
+ private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+ {
+ int size = map.size();
+ dos.writeInt(size);
+ if (size > 0)
+ {
+ Set<String> keys = map.keySet();
+ for (String key : keys)
+ {
+ dos.writeUTF(key);
+ ColumnFamily cf = map.get(key);
+ if (cf != null)
+ {
+ ColumnFamily.serializer().serialize(cf, dos);
+ }
+ }
+ }
+ }
+
+ public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(rm.table());
+ dos.writeUTF(rm.key());
+
+ /* serialize the modifications_ in the mutation */
+ freezeTheMaps(rm.modifications_, dos);
+ }
+
+ private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+ {
+ Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+ int size = dis.readInt();
+ for (int i = 0; i < size; ++i)
+ {
+ String key = dis.readUTF();
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+ map.put(key, cf);
+ }
+ return map;
+ }
+
+ public RowMutation deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+ return new RowMutation(table, key, modifications);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Thu Jul 30 15:30:21 2009
@@ -1,95 +1,95 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RowMutationMessage implements Serializable
-{
- public static final String hint_ = "HINT";
- private static RowMutationMessageSerializer serializer_ = new RowMutationMessageSerializer();
-
- static RowMutationMessageSerializer serializer()
- {
- return serializer_;
- }
-
- public Message makeRowMutationMessage() throws IOException
- {
- return makeRowMutationMessage(StorageService.mutationVerbHandler_);
- }
-
- public Message makeRowMutationMessage(String verbHandlerName) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- RowMutationMessage.serializer().serialize(this, dos);
- EndPoint local = StorageService.getLocalStorageEndPoint();
- EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
- return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
- }
-
- @XmlElement(name="RowMutation")
- private RowMutation rowMutation_;
-
- private RowMutationMessage()
- {}
-
- public RowMutationMessage(RowMutation rowMutation)
- {
- rowMutation_ = rowMutation;
- }
-
- public RowMutation getRowMutation()
- {
- return rowMutation_;
- }
-}
-
-class RowMutationMessageSerializer implements ICompactSerializer<RowMutationMessage>
-{
- public void serialize(RowMutationMessage rm, DataOutputStream dos) throws IOException
- {
- RowMutation.serializer().serialize(rm.getRowMutation(), dos);
- }
-
- public RowMutationMessage deserialize(DataInputStream dis) throws IOException
- {
- RowMutation rm = RowMutation.serializer().deserialize(dis);
- return new RowMutationMessage(rm);
- }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationMessage implements Serializable
+{
+ public static final String hint_ = "HINT";
+ private static RowMutationMessageSerializer serializer_ = new RowMutationMessageSerializer();
+
+ static RowMutationMessageSerializer serializer()
+ {
+ return serializer_;
+ }
+
+ public Message makeRowMutationMessage() throws IOException
+ {
+ return makeRowMutationMessage(StorageService.mutationVerbHandler_);
+ }
+
+ public Message makeRowMutationMessage(String verbHandlerName) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ RowMutationMessage.serializer().serialize(this, dos);
+ EndPoint local = StorageService.getLocalStorageEndPoint();
+ EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
+ return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ }
+
+ @XmlElement(name="RowMutation")
+ private RowMutation rowMutation_;
+
+ private RowMutationMessage()
+ {}
+
+ public RowMutationMessage(RowMutation rowMutation)
+ {
+ rowMutation_ = rowMutation;
+ }
+
+ public RowMutation getRowMutation()
+ {
+ return rowMutation_;
+ }
+}
+
+class RowMutationMessageSerializer implements ICompactSerializer<RowMutationMessage>
+{
+ public void serialize(RowMutationMessage rm, DataOutputStream dos) throws IOException
+ {
+ RowMutation.serializer().serialize(rm.getRowMutation(), dos);
+ }
+
+ public RowMutationMessage deserialize(DataInputStream dis) throws IOException
+ {
+ RowMutation rm = RowMutation.serializer().deserialize(dis);
+ return new RowMutationMessage(rm);
+ }
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,96 +1,96 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.*;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class RowMutationVerbHandler implements IVerbHandler
-{
- protected static class RowMutationContext
- {
- protected Row row_ = new Row();
- protected DataInputBuffer buffer_ = new DataInputBuffer();
- }
-
- private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
- /* We use this so that we can reuse the same row mutation context for the mutation. */
- private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-
- public void doVerb(Message message)
- {
- byte[] bytes = message.getMessageBody();
- /* Obtain a Row Mutation Context from TLS */
- RowMutationContext rowMutationCtx = tls_.get();
- if ( rowMutationCtx == null )
- {
- rowMutationCtx = new RowMutationContext();
- tls_.set(rowMutationCtx);
- }
-
- rowMutationCtx.buffer_.reset(bytes, bytes.length);
-
- try
- {
- RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
- if (logger_.isDebugEnabled())
- logger_.debug("Applying " + rm);
-
- /* Check if there were any hints in this message */
- byte[] hintedBytes = message.getHeader(RowMutation.HINT);
- if ( hintedBytes != null && hintedBytes.length > 0 )
- {
- EndPoint hint = EndPoint.fromBytes(hintedBytes);
- if (logger_.isDebugEnabled())
- logger_.debug("Adding hint for " + hint);
- /* add necessary hints to this mutation */
- RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
- hintedMutation.addHints(rm.key(), hint.getHost());
- hintedMutation.apply();
- }
-
- rowMutationCtx.row_.clear();
- rowMutationCtx.row_.setTable(rm.table());
- rowMutationCtx.row_.setKey(rm.key());
- rm.apply(rowMutationCtx.row_);
-
- WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
- Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
- if (logger_.isDebugEnabled())
- logger_.debug(rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
- }
- catch (IOException e)
- {
- logger_.error("Error in row mutation", e);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationVerbHandler implements IVerbHandler
+{
+ protected static class RowMutationContext
+ {
+ protected Row row_ = new Row();
+ protected DataInputBuffer buffer_ = new DataInputBuffer();
+ }
+
+ private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = message.getMessageBody();
+ /* Obtain a Row Mutation Context from TLS */
+ RowMutationContext rowMutationCtx = tls_.get();
+ if ( rowMutationCtx == null )
+ {
+ rowMutationCtx = new RowMutationContext();
+ tls_.set(rowMutationCtx);
+ }
+
+ rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
+ try
+ {
+ RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Applying " + rm);
+
+ /* Check if there were any hints in this message */
+ byte[] hintedBytes = message.getHeader(RowMutation.HINT);
+ if ( hintedBytes != null && hintedBytes.length > 0 )
+ {
+ EndPoint hint = EndPoint.fromBytes(hintedBytes);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Adding hint for " + hint);
+ /* add necessary hints to this mutation */
+ RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
+ hintedMutation.addHints(rm.key(), hint.getHost());
+ hintedMutation.apply();
+ }
+
+ rowMutationCtx.row_.clear();
+ rowMutationCtx.row_.setTable(rm.table());
+ rowMutationCtx.row_.setKey(rm.key());
+ rm.apply(rowMutationCtx.row_);
+
+ WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
+ Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+ if (logger_.isDebugEnabled())
+ logger_.debug(rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom());
+ MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
+ }
+ catch (IOException e)
+ {
+ logger_.error("Error in row mutation", e);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java Thu Jul 30 15:30:21 2009
@@ -1,87 +1,87 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.*;
-import java.io.IOException;
-
-
-/**
- * This class is used to loop through a retrieved column family
- * to get all columns in Iterator style. Usage is as follows:
- * Scanner scanner = new Scanner("table");
- * scanner.fetchColumnfamily(key, "column-family");
- *
- * while ( scanner.hasNext() )
- * {
- * Column column = scanner.next();
- * // Do something with the column
- * }
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Scanner implements IScanner<IColumn>
-{
- /* Table over which we are scanning. */
- private String table_;
- /* Iterator when iterating over the columns of a given key in a column family */
- private Iterator<IColumn> columnIt_;
-
- public Scanner(String table)
- {
- table_ = table;
- }
-
- /**
- * Fetch the columns associated with this key for the specified column family.
- * This method basically sets up an iterator internally and then provides an
- * iterator like interface to iterate over the columns.
- * @param key key we are interested in.
- * @param cf column family we are interested in.
- * @throws IOException
- */
- public void fetch(String key, String cf) throws IOException
- {
- if ( cf != null )
- {
- Table table = Table.open(table_);
- ColumnFamily columnFamily = table.get(key, cf);
- if ( columnFamily != null )
- {
- Collection<IColumn> columns = columnFamily.getSortedColumns();
- columnIt_ = columns.iterator();
- }
- }
- }
-
- public boolean hasNext() throws IOException
- {
- return columnIt_.hasNext();
- }
-
- public IColumn next()
- {
- return columnIt_.next();
- }
-
- public void close() throws IOException
- {
- throw new UnsupportedOperationException("This operation is not supported in the Scanner");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.IOException;
+
+
+/**
+ * This class is used to loop through a retrieved column family
+ * to get all columns in Iterator style. Usage is as follows:
+ * Scanner scanner = new Scanner("table");
+ * scanner.fetchColumnfamily(key, "column-family");
+ *
+ * while ( scanner.hasNext() )
+ * {
+ * Column column = scanner.next();
+ * // Do something with the column
+ * }
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Scanner implements IScanner<IColumn>
+{
+ /* Table over which we are scanning. */
+ private String table_;
+ /* Iterator when iterating over the columns of a given key in a column family */
+ private Iterator<IColumn> columnIt_;
+
+ public Scanner(String table)
+ {
+ table_ = table;
+ }
+
+ /**
+ * Fetch the columns associated with this key for the specified column family.
+ * This method basically sets up an iterator internally and then provides an
+ * iterator like interface to iterate over the columns.
+ * @param key key we are interested in.
+ * @param cf column family we are interested in.
+ * @throws IOException
+ */
+ public void fetch(String key, String cf) throws IOException
+ {
+ if ( cf != null )
+ {
+ Table table = Table.open(table_);
+ ColumnFamily columnFamily = table.get(key, cf);
+ if ( columnFamily != null )
+ {
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
+ columnIt_ = columns.iterator();
+ }
+ }
+ }
+
+ public boolean hasNext() throws IOException
+ {
+ return columnIt_.hasNext();
+ }
+
+ public IColumn next()
+ {
+ return columnIt_.next();
+ }
+
+ public void close() throws IOException
+ {
+ throw new UnsupportedOperationException("This operation is not supported in the Scanner");
+ }
+}