You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [5/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Added: incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class FastSerializer implements ISerializer
+{
+ public byte[] serialize(Message message) throws IOException
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ Message.serializer().serialize(message, buffer);
+ return buffer.getData();
+ }
+
+ public Message deserialize(byte[] bytes) throws IOException
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ return Message.serializer().deserialize(bufIn);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ISerializer
+{
+ public byte[] serialize(Message message) throws IOException;
+ public Message deserialize(byte[] bytes) throws IOException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStreamComplete
+{
+ /*
+ * This callback if registered with the StreamContextManager is
+ * called when the stream from a host is completely handled.
+ */
+ public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolHeaderState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ public ProtocolHeaderState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ byte[] protocolHeader = buffer_.array();
+ int pH = MessagingService.byteArrayToInt(protocolHeader);
+
+ int type = MessagingService.getBits(pH, 1, 2);
+ stream_.getProtocolHeader().serializerType_ = type;
+
+ int stream = MessagingService.getBits(pH, 3, 1);
+ stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
+
+ if ( stream_.getProtocolHeader().isStreamingMode_ )
+ MessagingService.setStreamingMode(true);
+
+ int listening = MessagingService.getBits(pH, 4, 1);
+ stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
+
+ int version = MessagingService.getBits(pH, 15, 8);
+ stream_.getProtocolHeader().version_ = version;
+
+ if ( version <= MessagingService.getVersion() )
+ {
+ if ( stream_.getProtocolHeader().isStreamingMode_ )
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
+ if ( nextState == null )
+ {
+ nextState = new ContentStreamState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ else
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
+ if ( nextState == null )
+ {
+ nextState = new ContentLengthState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ }
+ else
+ {
+ throw new IOException("Invalid version in message. Scram.");
+ }
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
+ }
+}
+
+
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ public ProtocolState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(16);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ byte[] protocol = buffer_.array();
+ if ( MessagingService.isProtocolValid(protocol) )
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolHeaderState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ else
+ {
+ throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
+ }
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: lakshman
+ * Date: Aug 22, 2005
+ * Time: 11:37:31 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReadNotCompleteException extends Exception
+{
+ ReadNotCompleteException(String message)
+ {
+ super(message);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SerializerAttribute
+{
+ SerializerType value();
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+public enum SerializerType
+{
+ BINARY,
+ JAVA,
+ XML,
+ JSON
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public abstract class StartState
+{
+ protected TcpReader stream_;
+
+ public StartState(TcpReader stream)
+ {
+ stream_ = stream;
+ }
+
+ public abstract byte[] read() throws IOException, ReadNotCompleteException;
+ public abstract void morphState() throws IOException;
+ public abstract void setContextData(Object data);
+
+ protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
+ {
+ SocketChannel socketChannel = stream_.getStream();
+ int bytesRead = socketChannel.read(buffer);
+ if ( bytesRead == -1 && buffer.remaining() > 0 )
+ {
+ throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
+ }
+ if ( buffer.remaining() == 0 )
+ {
+ morphState();
+ }
+ else
+ {
+ throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+ }
+ return new byte[0];
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StreamContextManager
+{
+ private static Logger logger_ = Logger.getLogger(StreamContextManager.class);
+
+ public static enum StreamCompletionAction
+ {
+ DELETE,
+ STREAM
+ }
+
+ public static class StreamContext implements Serializable
+ {
+ private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
+ private static ICompactSerializer<StreamContext> serializer_;
+
+ static
+ {
+ serializer_ = new StreamContextSerializer();
+ }
+
+ public static ICompactSerializer<StreamContext> serializer()
+ {
+ return serializer_;
+ }
+
+ private String targetFile_;
+ private long expectedBytes_;
+
+ public StreamContext(String targetFile, long expectedBytes)
+ {
+ targetFile_ = targetFile;
+ expectedBytes_ = expectedBytes;
+ }
+
+ public String getTargetFile()
+ {
+ return targetFile_;
+ }
+
+ public void setTargetFile(String file)
+ {
+ targetFile_ = file;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof StreamContext) )
+ return false;
+
+ StreamContext rhs = (StreamContext)o;
+ return targetFile_.equals(rhs.targetFile_);
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return targetFile_ + ":" + expectedBytes_;
+ }
+ }
+
+ public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
+ {
+ public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(sc.targetFile_);
+ dos.writeLong(sc.expectedBytes_);
+ }
+
+ public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ return new StreamContext(targetFile, expectedBytes);
+ }
+ }
+
+ public static class StreamStatus
+ {
+ private static ICompactSerializer<StreamStatus> serializer_;
+
+ static
+ {
+ serializer_ = new StreamStatusSerializer();
+ }
+
+ public static ICompactSerializer<StreamStatus> serializer()
+ {
+ return serializer_;
+ }
+
+ private String file_;
+ private long expectedBytes_;
+ private StreamCompletionAction action_;
+
+ public StreamStatus(String file, long expectedBytes)
+ {
+ file_ = file;
+ expectedBytes_ = expectedBytes;
+ action_ = StreamContextManager.StreamCompletionAction.DELETE;
+ }
+
+ public String getFile()
+ {
+ return file_;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ void setAction(StreamContextManager.StreamCompletionAction action)
+ {
+ action_ = action;
+ }
+
+ public StreamContextManager.StreamCompletionAction getAction()
+ {
+ return action_;
+ }
+ }
+
+ public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
+ {
+ public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(streamStatus.getFile());
+ dos.writeLong(streamStatus.getExpectedBytes());
+ dos.writeInt(streamStatus.getAction().ordinal());
+ }
+
+ public StreamStatus deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
+
+ int ordinal = dis.readInt();
+ if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.DELETE);
+ }
+ else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.STREAM);
+ }
+
+ return streamStatus;
+ }
+ }
+
+ public static class StreamStatusMessage implements Serializable
+ {
+ private static ICompactSerializer<StreamStatusMessage> serializer_;
+
+ static
+ {
+ serializer_ = new StreamStatusMessageSerializer();
+ }
+
+ public static ICompactSerializer<StreamStatusMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
+ return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, new Object[]{bos.toByteArray()});
+ }
+
+ protected StreamContextManager.StreamStatus streamStatus_;
+
+ public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+ {
+ streamStatus_ = streamStatus;
+ }
+
+ public StreamContextManager.StreamStatus getStreamStatus()
+ {
+ return streamStatus_;
+ }
+ }
+
+ public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
+ {
+ public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
+ {
+ StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);
+ }
+
+ public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
+ {
+ StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
+ return new StreamStatusMessage(streamStatus);
+ }
+ }
+
+ /* Maintain a stream context per host that is the source of the stream */
+ public static Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();
+ /* Maintain in this map the status of the streams that need to be sent back to the source */
+ public static Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+ /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
+ public static Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+
+ public synchronized static StreamContext getStreamContext(String key)
+ {
+ List<StreamContext> context = ctxBag_.get(key);
+ if ( context == null )
+ throw new IllegalStateException("Streaming context has not been set.");
+ StreamContext streamContext = context.remove(0);
+ if ( context.isEmpty() )
+ ctxBag_.remove(key);
+ return streamContext;
+ }
+
+ public synchronized static StreamStatus getStreamStatus(String key)
+ {
+ List<StreamStatus> status = streamStatusBag_.get(key);
+ if ( status == null )
+ throw new IllegalStateException("Streaming status has not been set.");
+ StreamStatus streamStatus = status.remove(0);
+ if ( status.isEmpty() )
+ streamStatusBag_.remove(key);
+ return streamStatus;
+ }
+
+ /*
+ * This method helps determine if the StreamCompletionHandler needs
+ * to be invoked for the data being streamed from a source.
+ */
+ public synchronized static boolean isDone(String key)
+ {
+ return (ctxBag_.get(key) == null);
+ }
+
+ public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+ {
+ return streamNotificationHandlers_.get(key);
+ }
+
+ public synchronized static void removeStreamCompletionHandler(String key)
+ {
+ streamNotificationHandlers_.remove(key);
+ }
+
+ public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+ {
+ streamNotificationHandlers_.put(key, streamComplete);
+ }
+
+ public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+ {
+ /* Record the stream context */
+ List<StreamContext> context = ctxBag_.get(key);
+ if ( context == null )
+ {
+ context = new ArrayList<StreamContext>();
+ ctxBag_.put(key, context);
+ }
+ context.add(streamContext);
+
+ /* Record the stream status for this stream context */
+ List<StreamStatus> status = streamStatusBag_.get(key);
+ if ( status == null )
+ {
+ status = new ArrayList<StreamStatus>();
+ streamStatusBag_.put(key, status);
+ }
+ status.add( streamStatus );
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+
+import org.apache.cassandra.net.ProtocolHeader;
+import org.apache.cassandra.net.TcpConnection;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpReader
+{
+ public static enum TcpReaderState
+ {
+ START,
+ PREAMBLE,
+ PROTOCOL,
+ CONTENT_LENGTH,
+ CONTENT,
+ CONTENT_STREAM,
+ DONE
+ }
+
+ private Map<TcpReaderState, StartState> stateMap_ = new HashMap<TcpReaderState, StartState>();
+ private TcpConnection connection_;
+ private StartState socketState_;
+ private ProtocolHeader protocolHeader_;
+
+ public TcpReader(TcpConnection connection)
+ {
+ connection_ = connection;
+ }
+
+ public StartState getSocketState(TcpReaderState state)
+ {
+ return stateMap_.get(state);
+ }
+
+ public void putSocketState(TcpReaderState state, StartState socketState)
+ {
+ stateMap_.put(state, socketState);
+ }
+
+ public void resetState()
+ {
+ StartState nextState = stateMap_.get(TcpReaderState.PREAMBLE);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolState(this);
+ stateMap_.put(TcpReaderState.PREAMBLE, nextState);
+ }
+ socketState_ = nextState;
+ }
+
+ public void morphState(StartState state)
+ {
+ socketState_ = state;
+ if ( protocolHeader_ == null )
+ protocolHeader_ = new ProtocolHeader();
+ }
+
+ public ProtocolHeader getProtocolHeader()
+ {
+ return protocolHeader_;
+ }
+
+ public SocketChannel getStream()
+ {
+ return connection_.getSocketChannel();
+ }
+
+ public byte[] read() throws IOException
+ {
+ byte[] bytes = new byte[0];
+ while ( socketState_ != null )
+ {
+ try
+ {
+ bytes = socketState_.read();
+ }
+ catch ( ReadNotCompleteException e )
+ {
+ break;
+ }
+ }
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ Map<TcpReaderState, StartState> stateMap = new HashMap<TcpReaderState, StartState>();
+ stateMap.put(TcpReaderState.CONTENT, new ContentState(null, 10));
+ stateMap.put(TcpReaderState.START, new ProtocolState(null));
+ stateMap.put(TcpReaderState.CONTENT_LENGTH, new ContentLengthState(null));
+
+ StartState state = stateMap.get(TcpReaderState.CONTENT);
+ System.out.println( state.getClass().getName() );
+ state = stateMap.get(TcpReaderState.CONTENT_LENGTH);
+ System.out.println( state.getClass().getName() );
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessageSink
+{
+ public Message handleMessage(Message message);
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SinkManager
+{
+ private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>();
+
+ public static boolean isInitialized()
+ {
+ return ( messageSinks_.size() > 0 );
+ }
+
+ public static void addMessageSink(IMessageSink ms)
+ {
+ messageSinks_.addLast(ms);
+ }
+
+ public static void clearSinks(){
+ messageSinks_.clear();
+ }
+
+ public static Message processClientMessageSink(Message message)
+ {
+ ListIterator<IMessageSink> li = messageSinks_.listIterator();
+ while ( li.hasNext() )
+ {
+ IMessageSink ms = li.next();
+ message = ms.handleMessage(message);
+ if ( message == null )
+ {
+ return null;
+ }
+ }
+ return message;
+ }
+
+ public static Message processServerMessageSink(Message message)
+ {
+ ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size());
+ while ( li.hasPrevious() )
+ {
+ IMessageSink ms = li.previous();
+ message = ms.handleMessage(message);
+ if ( message == null )
+ {
+ return null;
+ }
+ }
+ return message;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,13 @@
+package org.apache.cassandra.procedures;
+
+import groovy.lang.GroovyShell;
+
+public class GroovyScriptRunner
+{
+ private static GroovyShell groovyShell_ = new GroovyShell();
+
+ public static String evaluateString(String script)
+ {
+ return groovyShell_.evaluate(script).toString();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,64 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.SSTable;
+import org.apache.log4j.Logger;
+
+public final class BootstrapAndLbHelper
+{
+ private static final Logger logger_ = Logger.getLogger(BootstrapAndLbHelper.class);
+ private static List<String> getIndexedPrimaryKeys()
+ {
+ List<String> indexedPrimaryKeys = SSTable.getIndexedKeys();
+ Iterator<String> it = indexedPrimaryKeys.iterator();
+
+ while ( it.hasNext() )
+ {
+ String key = it.next();
+ if ( !StorageService.instance().isPrimary(key) )
+ {
+ it.remove();
+ }
+ }
+ return indexedPrimaryKeys;
+ }
+
+ /**
+ * Given the number of keys that need to be transferred say, 1000
+ * and given the smallest key stored we need the hash of the 1000th
+ * key greater than the smallest key in the sorted order in the primary
+ * range.
+ *
+ * @param keyCount number of keys after which token is required.
+ * @return token.
+ */
+ public static BigInteger getTokenBasedOnPrimaryCount(int keyCount)
+ {
+ List<String> indexedPrimaryKeys = getIndexedPrimaryKeys();
+ int index = keyCount / SSTable.indexInterval();
+ String key = (index >= indexedPrimaryKeys.size()) ? indexedPrimaryKeys.get( indexedPrimaryKeys.size() - 1 ) : indexedPrimaryKeys.get(index);
+ logger_.debug("Hashing key " + key + " ...");
+ return StorageService.instance().hash(key);
+ }
+}