You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [21/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java Thu Jul 30 15:30:21 2009
@@ -1,133 +1,133 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-public class MultiAsyncResult implements IAsyncResult
-{
- private static Logger logger_ = Logger.getLogger( AsyncResult.class );
- private int expectedResults_;
- private List<byte[]> result_ = new ArrayList<byte[]>();
- private AtomicBoolean done_ = new AtomicBoolean(false);
- private Lock lock_ = new ReentrantLock();
- private Condition condition_;
-
- MultiAsyncResult(int expectedResults)
- {
- expectedResults_ = expectedResults;
- condition_ = lock_.newCondition();
- }
-
- public byte[] get()
- {
- throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
- }
-
- public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
- {
- throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
- }
-
- public List<byte[]> multiget()
- {
- lock_.lock();
- try
- {
- if ( !done_.get() )
- {
- condition_.await();
- }
- }
- catch ( InterruptedException ex )
- {
- logger_.warn( LogUtil.throwableToString(ex) );
- }
- finally
- {
- lock_.unlock();
- }
- return result_;
- }
-
- public boolean isDone()
- {
- return done_.get();
- }
-
- public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
- {
- lock_.lock();
- try
- {
- boolean bVal = true;
- try
- {
- if ( !done_.get() )
- {
- bVal = condition_.await(timeout, tu);
- }
- }
- catch ( InterruptedException ex )
- {
- logger_.warn( LogUtil.throwableToString(ex) );
- }
-
- if ( !bVal && !done_.get() )
- {
- throw new TimeoutException("Operation timed out.");
- }
- }
- finally
- {
- lock_.unlock();
- }
- return result_;
- }
-
- public void result(Message result)
- {
- try
- {
- lock_.lock();
- if ( !done_.get() )
- {
- result_.add(result.getMessageBody());
- if ( result_.size() == expectedResults_ )
- {
- done_.set(true);
- condition_.signal();
- }
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class MultiAsyncResult implements IAsyncResult
+{
+ private static Logger logger_ = Logger.getLogger( AsyncResult.class );
+ private int expectedResults_;
+ private List<byte[]> result_ = new ArrayList<byte[]>();
+ private AtomicBoolean done_ = new AtomicBoolean(false);
+ private Lock lock_ = new ReentrantLock();
+ private Condition condition_;
+
+ MultiAsyncResult(int expectedResults)
+ {
+ expectedResults_ = expectedResults;
+ condition_ = lock_.newCondition();
+ }
+
+ public byte[] get()
+ {
+ throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+ }
+
+ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
+ {
+ throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+ }
+
+ public List<byte[]> multiget()
+ {
+ lock_.lock();
+ try
+ {
+ if ( !done_.get() )
+ {
+ condition_.await();
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return result_;
+ }
+
+ public boolean isDone()
+ {
+ return done_.get();
+ }
+
+ public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+ {
+ lock_.lock();
+ try
+ {
+ boolean bVal = true;
+ try
+ {
+ if ( !done_.get() )
+ {
+ bVal = condition_.await(timeout, tu);
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+
+ if ( !bVal && !done_.get() )
+ {
+ throw new TimeoutException("Operation timed out.");
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return result_;
+ }
+
+ public void result(Message result)
+ {
+ try
+ {
+ lock_.lock();
+ if ( !done_.get() )
+ {
+ result_.add(result.getMessageBody());
+ if ( result_.size() == expectedResults_ )
+ {
+ done_.set(true);
+ condition_.signal();
+ }
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java Thu Jul 30 15:30:21 2009
@@ -1,36 +1,36 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public final class ProtocolHeader
-{
- public static final String SERIALIZER = "SERIALIZER";
- public static final String COMPRESSION = "COMPRESSION";
- public static final String VERSION = "VERSION";
-
- public int serializerType_;
- public boolean isCompressed_;
- public boolean isStreamingMode_;
- public boolean isListening_;
- public int version_;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ProtocolHeader
+{
+ public static final String SERIALIZER = "SERIALIZER";
+ public static final String COMPRESSION = "COMPRESSION";
+ public static final String VERSION = "VERSION";
+
+ public int serializerType_;
+ public boolean isCompressed_;
+ public boolean isStreamingMode_;
+ public boolean isListening_;
+ public int version_;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,52 +1,52 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class ResponseVerbHandler implements IVerbHandler
-{
- private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
-
- public void doVerb(Message message)
- {
- String messageId = message.getMessageId();
- IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
- if ( cb != null )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
- cb.response(message);
- }
- else
- {
- IAsyncResult ar = MessagingService.getAsyncResult(messageId);
- if ( ar != null )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
- ar.result(message);
- }
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ResponseVerbHandler implements IVerbHandler
+{
+ private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ String messageId = message.getMessageId();
+ IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+ if ( cb != null )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+ cb.response(message);
+ }
+ else
+ {
+ IAsyncResult ar = MessagingService.getAsyncResult(messageId);
+ if ( ar != null )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+ ar.result(message);
+ }
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java Thu Jul 30 15:30:21 2009
@@ -1,84 +1,84 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.nio.channels.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class SelectionKeyHandler
-{
- /**
- * Method which is called when the key becomes acceptable.
- *
- * @param key The key which is acceptable.
- */
- public void accept(SelectionKey key)
- {
- throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
- }
-
- /**
- * Method which is called when the key becomes connectable.
- *
- * @param key The key which is connectable.
- */
- public void connect(SelectionKey key)
- {
- throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
- }
-
- /**
- * Method which is called when the key becomes readable.
- *
- * @param key The key which is readable.
- */
- public void read(SelectionKey key)
- {
- throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
- }
-
- /**
- * Method which is called when the key becomes writable.
- *
- * @param key The key which is writable.
- */
- public void write(SelectionKey key)
- {
- throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
- }
-
- protected static void turnOnInterestOps(SelectionKey key, int ops)
- {
- synchronized(key)
- {
- key.interestOps(key.interestOps() | ops);
- }
- }
-
- protected static void turnOffInterestOps(SelectionKey key, int ops)
- {
- synchronized(key)
- {
- key.interestOps(key.interestOps() & (~ops) );
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectionKeyHandler
+{
+ /**
+ * Method which is called when the key becomes acceptable.
+ *
+ * @param key The key which is acceptable.
+ */
+ public void accept(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes connectable.
+ *
+ * @param key The key which is connectable.
+ */
+ public void connect(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes readable.
+ *
+ * @param key The key which is readable.
+ */
+ public void read(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes writable.
+ *
+ * @param key The key which is writable.
+ */
+ public void write(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
+ }
+
+ protected static void turnOnInterestOps(SelectionKey key, int ops)
+ {
+ synchronized(key)
+ {
+ key.interestOps(key.interestOps() | ops);
+ }
+ }
+
+ protected static void turnOffInterestOps(SelectionKey key, int ops)
+ {
+ synchronized(key)
+ {
+ key.interestOps(key.interestOps() & (~ops) );
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java Thu Jul 30 15:30:21 2009
@@ -1,178 +1,178 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-
-import org.apache.log4j.Logger;
-
-public class SelectorManager extends Thread
-{
- private static final Logger logger = Logger.getLogger(SelectorManager.class);
-
- // the underlying selector used
- protected Selector selector;
-
- // workaround JDK select/register bug
- Object gate = new Object();
-
- // The static selector manager which is used by all applications
- private static SelectorManager manager;
-
- // The static UDP selector manager which is used by all applications
- private static SelectorManager udpManager;
-
- private SelectorManager(String name)
- {
- super(name);
-
- try
- {
- selector = Selector.open();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
-
- setDaemon(false);
- }
-
- /**
- * Registers a new channel with the selector, and attaches the given
- * SelectionKeyHandler as the handler for the newly created key. Operations
- * which the handler is interested in will be called as available.
- *
- * @param channel
- * The channel to register with the selector
- * @param handler
- * The handler to use for the callbacks
- * @param ops
- * The initial interest operations
- * @return The SelectionKey which uniquely identifies this channel
- * @exception IOException if the channel is closed
- */
- public SelectionKey register(SelectableChannel channel,
- SelectionKeyHandler handler, int ops) throws IOException
- {
- assert channel != null;
- assert handler != null;
-
- synchronized(gate)
- {
- selector.wakeup();
- return channel.register(selector, ops, handler);
- }
- }
-
- /**
- * This method starts the socket manager listening for events. It is
- * designed to be started when this thread's start() method is invoked.
- */
- public void run()
- {
- while (true)
- {
- try
- {
- selector.select(1);
- doProcess();
- synchronized(gate) {}
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- protected void doProcess() throws IOException
- {
- SelectionKey[] keys = selector.selectedKeys().toArray(new SelectionKey[0]);
-
- for (SelectionKey key : keys)
- {
- selector.selectedKeys().remove(key);
-
- synchronized (key)
- {
- SelectionKeyHandler skh = (SelectionKeyHandler) key.attachment();
-
- if (skh != null)
- {
- // accept
- if (key.isValid() && key.isAcceptable())
- {
- skh.accept(key);
- }
-
- // connect
- if (key.isValid() && key.isConnectable())
- {
- skh.connect(key);
- }
-
- // read
- if (key.isValid() && key.isReadable())
- {
- skh.read(key);
- }
-
- // write
- if (key.isValid() && key.isWritable())
- {
- skh.write(key);
- }
- }
- }
- }
- }
-
- /**
- * Returns the SelectorManager applications should use.
- *
- * @return The SelectorManager which applications should use
- */
- public static SelectorManager getSelectorManager()
- {
- synchronized (SelectorManager.class)
- {
- if (manager == null)
- {
- manager = new SelectorManager("TCP Selector Manager");
- }
- }
- return manager;
- }
-
- public static SelectorManager getUdpSelectorManager()
- {
- synchronized (SelectorManager.class)
- {
- if (udpManager == null)
- {
- udpManager = new SelectorManager("UDP Selector Manager");
- }
- }
- return udpManager;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+import org.apache.log4j.Logger;
+
+public class SelectorManager extends Thread
+{
+ private static final Logger logger = Logger.getLogger(SelectorManager.class);
+
+ // the underlying selector used
+ protected Selector selector;
+
+ // workaround JDK select/register bug
+ Object gate = new Object();
+
+ // The static selector manager which is used by all applications
+ private static SelectorManager manager;
+
+ // The static UDP selector manager which is used by all applications
+ private static SelectorManager udpManager;
+
+ private SelectorManager(String name)
+ {
+ super(name);
+
+ try
+ {
+ selector = Selector.open();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ setDaemon(false);
+ }
+
+ /**
+ * Registers a new channel with the selector, and attaches the given
+ * SelectionKeyHandler as the handler for the newly created key. Operations
+ * which the handler is interested in will be called as available.
+ *
+ * @param channel
+ * The channel to register with the selector
+ * @param handler
+ * The handler to use for the callbacks
+ * @param ops
+ * The initial interest operations
+ * @return The SelectionKey which uniquely identifies this channel
+ * @exception IOException if the channel is closed
+ */
+ public SelectionKey register(SelectableChannel channel,
+ SelectionKeyHandler handler, int ops) throws IOException
+ {
+ assert channel != null;
+ assert handler != null;
+
+ synchronized(gate)
+ {
+ selector.wakeup();
+ return channel.register(selector, ops, handler);
+ }
+ }
+
+ /**
+ * This method starts the socket manager listening for events. It is
+ * designed to be started when this thread's start() method is invoked.
+ */
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ selector.select(1);
+ doProcess();
+ synchronized(gate) {}
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected void doProcess() throws IOException
+ {
+ SelectionKey[] keys = selector.selectedKeys().toArray(new SelectionKey[0]);
+
+ for (SelectionKey key : keys)
+ {
+ selector.selectedKeys().remove(key);
+
+ synchronized (key)
+ {
+ SelectionKeyHandler skh = (SelectionKeyHandler) key.attachment();
+
+ if (skh != null)
+ {
+ // accept
+ if (key.isValid() && key.isAcceptable())
+ {
+ skh.accept(key);
+ }
+
+ // connect
+ if (key.isValid() && key.isConnectable())
+ {
+ skh.connect(key);
+ }
+
+ // read
+ if (key.isValid() && key.isReadable())
+ {
+ skh.read(key);
+ }
+
+ // write
+ if (key.isValid() && key.isWritable())
+ {
+ skh.write(key);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the SelectorManager applications should use.
+ *
+ * @return The SelectorManager which applications should use
+ */
+ public static SelectorManager getSelectorManager()
+ {
+ synchronized (SelectorManager.class)
+ {
+ if (manager == null)
+ {
+ manager = new SelectorManager("TCP Selector Manager");
+ }
+ }
+ return manager;
+ }
+
+ public static SelectorManager getUdpSelectorManager()
+ {
+ synchronized (SelectorManager.class)
+ {
+ if (udpManager == null)
+ {
+ udpManager = new SelectorManager("UDP Selector Manager");
+ }
+ }
+ return udpManager;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Thu Jul 30 15:30:21 2009
@@ -1,520 +1,520 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.FastSerializer;
-import org.apache.cassandra.net.io.ISerializer;
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.io.StartState;
-import org.apache.cassandra.net.io.TcpReader;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TcpConnection extends SelectionKeyHandler implements Comparable
-{
- // logging and profiling.
- private static Logger logger_ = Logger.getLogger(TcpConnection.class);
- private static ISerializer serializer_ = new FastSerializer();
- private SocketChannel socketChannel_;
- private SelectionKey key_;
- private TcpConnectionManager pool_;
- private boolean isIncoming_ = false;
- private TcpReader tcpReader_;
- private ReadWorkItem readWork_ = new ReadWorkItem();
- private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
- private EndPoint localEp_;
- private EndPoint remoteEp_;
- boolean inUse_ = false;
-
- /*
- * Added for streaming support. We need the boolean
- * to indicate that this connection is used for
- * streaming. The Condition and the Lock are used
- * to signal the stream() that it can continue
- * streaming when the socket becomes writable.
- */
- private boolean bStream_ = false;
- private Lock lock_;
- private Condition condition_;
-
- // used from getConnection - outgoing
- TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
- {
- socketChannel_ = SocketChannel.open();
- socketChannel_.configureBlocking(false);
- pool_ = pool;
-
- localEp_ = from;
- remoteEp_ = to;
-
- if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
- }
- else
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- }
- }
-
- /*
- * Used for streaming purposes has no pooling semantics.
- */
- TcpConnection(EndPoint from, EndPoint to) throws IOException
- {
- socketChannel_ = SocketChannel.open();
- socketChannel_.configureBlocking(false);
-
- localEp_ = from;
- remoteEp_ = to;
-
- if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
- }
- else
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- }
- bStream_ = true;
- lock_ = new ReentrantLock();
- condition_ = lock_.newCondition();
- }
-
- /*
- * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
- * Accept the connection and then register interest for reads.
- */
- static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
- {
- TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
- tcpConnection.registerReadInterest();
- }
-
- private void registerReadInterest() throws IOException
- {
- key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- }
-
- // used for incoming connections
- TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
- {
- socketChannel_ = socketChannel;
- socketChannel_.configureBlocking(false);
- isIncoming_ = isIncoming;
- localEp_ = localEp;
- }
-
- EndPoint getLocalEp()
- {
- return localEp_;
- }
-
- public void setLocalEp(EndPoint localEp)
- {
- localEp_ = localEp;
- }
-
- public EndPoint getEndPoint()
- {
- return remoteEp_;
- }
-
- public boolean isIncoming()
- {
- return isIncoming_;
- }
-
- public SocketChannel getSocketChannel()
- {
- return socketChannel_;
- }
-
- public void write(Message message) throws IOException
- {
- byte[] data = serializer_.serialize(message);
- if ( data.length > 0 )
- {
- boolean listening = !message.getFrom().equals(EndPoint.sentinelLocalEndPoint_);
- ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);
- synchronized(this)
- {
- if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
- {
- pendingWrites_.add(buffer);
- return;
- }
-
- socketChannel_.write(buffer);
-
- if (buffer.remaining() > 0)
- {
- pendingWrites_.add(buffer);
- turnOnInterestOps(key_, SelectionKey.OP_WRITE);
- }
- }
- }
- }
-
- public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException
- {
- if ( !bStream_ )
- throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
-
- lock_.lock();
- try
- {
- /* transfer 64MB in each attempt */
- int limit = 64*1024*1024;
- long total = endPosition - startPosition;
- /* keeps track of total number of bytes transferred */
- long bytesWritten = 0L;
- RandomAccessFile raf = new RandomAccessFile(file, "r");
- FileChannel fc = raf.getChannel();
-
- /*
- * If the connection is not yet established then wait for
- * the timeout period of 2 seconds. Attempt to reconnect 3 times and then
- * bail with an IOException.
- */
- long waitTime = 2;
- int retry = 0;
- while (!socketChannel_.isConnected())
- {
- if ( retry == 3 )
- throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
- condition_.await(waitTime, TimeUnit.SECONDS);
- ++retry;
- }
-
- while ( bytesWritten < total )
- {
- if ( startPosition == 0 )
- {
- ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
- socketChannel_.write(buffer);
- if (buffer.remaining() > 0)
- {
- pendingWrites_.add(buffer);
- turnOnInterestOps(key_, SelectionKey.OP_WRITE);
- condition_.await();
- }
- }
-
- /* returns the number of bytes transferred from file to the socket */
- long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
- if (logger_.isDebugEnabled())
- logger_.debug("Bytes transferred " + bytesTransferred);
- bytesWritten += bytesTransferred;
- startPosition += bytesTransferred;
- /*
- * If the number of bytes transferred is less than intended
- * then we need to wait till socket becomes writeable again.
- */
- if ( bytesTransferred < limit && bytesWritten != total )
- {
- turnOnInterestOps(key_, SelectionKey.OP_WRITE);
- condition_.await();
- }
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- private void resumeStreaming()
- {
- /* if not in streaming mode do nothing */
- if ( !bStream_ )
- return;
-
- lock_.lock();
- try
- {
- condition_.signal();
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- public void close()
- {
- inUse_ = false;
- if ( pool_.contains(this) )
- pool_.decUsed();
- }
-
- public boolean isConnected()
- {
- return socketChannel_.isConnected();
- }
-
- public boolean equals(Object o)
- {
- if ( !(o instanceof TcpConnection) )
- return false;
-
- TcpConnection rhs = (TcpConnection)o;
- if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
- return true;
- else
- return false;
- }
-
- public int hashCode()
- {
- return (localEp_ + ":" + remoteEp_).hashCode();
- }
-
- public String toString()
- {
- return socketChannel_.toString();
- }
-
- void closeSocket()
- {
- logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
- if ( pool_ != null )
- {
- pool_.removeConnection(this);
- }
- cancel(key_);
- pendingWrites_.clear();
- }
-
- void errorClose()
- {
- logger_.warn("Closing down connection " + socketChannel_);
- pendingWrites_.clear();
- cancel(key_);
- pendingWrites_.clear();
- if ( pool_ != null )
- {
- pool_.removeConnection(this);
- }
- }
-
- private void cancel(SelectionKey key)
- {
- if ( key != null )
- {
- key.cancel();
- try
- {
- key.channel().close();
- }
- catch (IOException e) {}
- }
- }
-
- // called in the selector thread
- public void connect(SelectionKey key)
- {
- turnOffInterestOps(key, SelectionKey.OP_CONNECT);
- try
- {
- if (socketChannel_.finishConnect())
- {
- turnOnInterestOps(key, SelectionKey.OP_READ);
-
- synchronized(this)
- {
- // this will flush the pending
- if (!pendingWrites_.isEmpty())
- {
- turnOnInterestOps(key_, SelectionKey.OP_WRITE);
- }
- }
- resumeStreaming();
- }
- else
- {
- logger_.error("Closing connection because socket channel could not finishConnect.");;
- errorClose();
- }
- }
- catch(IOException e)
- {
- logger_.error("Encountered IOException on connection: " + socketChannel_, e);
- errorClose();
- }
- }
-
- // called in the selector thread
- public void write(SelectionKey key)
- {
- turnOffInterestOps(key, SelectionKey.OP_WRITE);
- doPendingWrites();
- /*
- * This is executed only if we are in streaming mode.
- * Idea is that we read a chunk of data from a source
- * and wait to read the next from the source until we
- * are siganlled to do so from here.
- */
- resumeStreaming();
- }
-
- void doPendingWrites()
- {
- synchronized(this)
- {
- try
- {
- while(!pendingWrites_.isEmpty())
- {
- ByteBuffer buffer = pendingWrites_.peek();
- socketChannel_.write(buffer);
- if (buffer.remaining() > 0)
- {
- break;
- }
- pendingWrites_.remove();
- }
-
- }
- catch(IOException ex)
- {
- logger_.error(LogUtil.throwableToString(ex));
- // This is to fix the wierd Linux bug with NIO.
- errorClose();
- }
- finally
- {
- if (!pendingWrites_.isEmpty())
- {
- turnOnInterestOps(key_, SelectionKey.OP_WRITE);
- }
- }
- }
- }
-
- // called in the selector thread
- public void read(SelectionKey key)
- {
- turnOffInterestOps(key, SelectionKey.OP_READ);
- // publish this event onto to the TCPReadEvent Queue.
- MessagingService.getReadExecutor().execute(readWork_);
- }
-
- class ReadWorkItem implements Runnable
- {
- // called from the TCP READ thread pool
- public void run()
- {
- if ( tcpReader_ == null )
- {
- tcpReader_ = new TcpReader(TcpConnection.this);
- StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
- if ( nextState == null )
- {
- nextState = new ProtocolState(tcpReader_);
- tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
- }
- tcpReader_.morphState(nextState);
- }
-
- try
- {
- byte[] bytes = new byte[0];
- while ( (bytes = tcpReader_.read()).length > 0 )
- {
- ProtocolHeader pH = tcpReader_.getProtocolHeader();
- if ( !pH.isStreamingMode_ )
- {
- /* first message received */
- if (remoteEp_ == null)
- {
- int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.sentinelPort_;
- remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostAddress(), port );
- // put connection into pool if possible
- pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);
- pool_.addToPool(TcpConnection.this);
- }
-
- /* Deserialize and handle the message */
- MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );
- tcpReader_.resetState();
- }
- else
- {
- MessagingService.setStreamingMode(false);
- /* Close this socket connection used for streaming */
- closeSocket();
- }
- }
- }
- catch ( IOException ex )
- {
- handleException(ex);
- }
- catch ( Throwable th )
- {
- handleException(th);
- }
- finally
- {
- turnOnInterestOps(key_, SelectionKey.OP_READ);
- }
- }
-
- private void handleException(Throwable th)
- {
- logger_.warn("Problem reading from socket connected to : " + socketChannel_);
- logger_.warn(LogUtil.throwableToString(th));
- // This is to fix the weird Linux bug with NIO.
- errorClose();
- }
- }
-
- public int pending()
- {
- return pendingWrites_.size();
- }
-
- public int compareTo(Object o)
- {
- if (o instanceof TcpConnection)
- {
- return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();
- }
-
- throw new IllegalArgumentException();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.io.StartState;
+import org.apache.cassandra.net.io.TcpReader;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnection extends SelectionKeyHandler implements Comparable
+{
+ // logging and profiling.
+ private static Logger logger_ = Logger.getLogger(TcpConnection.class);
+ private static ISerializer serializer_ = new FastSerializer();
+ private SocketChannel socketChannel_;
+ private SelectionKey key_;
+ private TcpConnectionManager pool_;
+ private boolean isIncoming_ = false;
+ private TcpReader tcpReader_;
+ private ReadWorkItem readWork_ = new ReadWorkItem();
+ private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
+ private EndPoint localEp_;
+ private EndPoint remoteEp_;
+ boolean inUse_ = false;
+
+ /*
+ * Added for streaming support. We need the boolean
+ * to indicate that this connection is used for
+ * streaming. The Condition and the Lock are used
+ * to signal the stream() that it can continue
+ * streaming when the socket becomes writable.
+ */
+ private boolean bStream_ = false;
+ private Lock lock_;
+ private Condition condition_;
+
+ // used from getConnection - outgoing
+ TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+ pool_ = pool;
+
+ localEp_ = from;
+ remoteEp_ = to;
+
+ if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+ }
+ else
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+ }
+
+ /*
+ * Used for streaming purposes has no pooling semantics.
+ */
+ TcpConnection(EndPoint from, EndPoint to) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+
+ localEp_ = from;
+ remoteEp_ = to;
+
+ if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+ }
+ else
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+ bStream_ = true;
+ lock_ = new ReentrantLock();
+ condition_ = lock_.newCondition();
+ }
+
+ /*
+ * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
+ * Accept the connection and then register interest for reads.
+ */
+ static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ {
+ TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
+ tcpConnection.registerReadInterest();
+ }
+
+ private void registerReadInterest() throws IOException
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+
+ // used for incoming connections
+ TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ {
+ socketChannel_ = socketChannel;
+ socketChannel_.configureBlocking(false);
+ isIncoming_ = isIncoming;
+ localEp_ = localEp;
+ }
+
+ EndPoint getLocalEp()
+ {
+ return localEp_;
+ }
+
+ public void setLocalEp(EndPoint localEp)
+ {
+ localEp_ = localEp;
+ }
+
+ public EndPoint getEndPoint()
+ {
+ return remoteEp_;
+ }
+
+ public boolean isIncoming()
+ {
+ return isIncoming_;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return socketChannel_;
+ }
+
+ public void write(Message message) throws IOException
+ {
+ byte[] data = serializer_.serialize(message);
+ if ( data.length > 0 )
+ {
+ boolean listening = !message.getFrom().equals(EndPoint.sentinelLocalEndPoint_);
+ ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);
+ synchronized(this)
+ {
+ if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
+ {
+ pendingWrites_.add(buffer);
+ return;
+ }
+
+ socketChannel_.write(buffer);
+
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+ }
+ }
+ }
+ }
+
+ public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException
+ {
+ if ( !bStream_ )
+ throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
+
+ lock_.lock();
+ try
+ {
+ /* transfer 64MB in each attempt */
+ int limit = 64*1024*1024;
+ long total = endPosition - startPosition;
+ /* keeps track of total number of bytes transferred */
+ long bytesWritten = 0L;
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ FileChannel fc = raf.getChannel();
+
+ /*
+ * If the connection is not yet established then wait for
+ * the timeout period of 2 seconds. Attempt to reconnect 3 times and then
+ * bail with an IOException.
+ */
+ long waitTime = 2;
+ int retry = 0;
+ while (!socketChannel_.isConnected())
+ {
+ if ( retry == 3 )
+ throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
+ condition_.await(waitTime, TimeUnit.SECONDS);
+ ++retry;
+ }
+
+ while ( bytesWritten < total )
+ {
+ if ( startPosition == 0 )
+ {
+ ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+ socketChannel_.write(buffer);
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+ condition_.await();
+ }
+ }
+
+ /* returns the number of bytes transferred from file to the socket */
+ long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Bytes transferred " + bytesTransferred);
+ bytesWritten += bytesTransferred;
+ startPosition += bytesTransferred;
+ /*
+ * If the number of bytes transferred is less than intended
+ * then we need to wait till socket becomes writeable again.
+ */
+ if ( bytesTransferred < limit && bytesWritten != total )
+ {
+ turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+ condition_.await();
+ }
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ private void resumeStreaming()
+ {
+ /* if not in streaming mode do nothing */
+ if ( !bStream_ )
+ return;
+
+ lock_.lock();
+ try
+ {
+ condition_.signal();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ public void close()
+ {
+ inUse_ = false;
+ if ( pool_.contains(this) )
+ pool_.decUsed();
+ }
+
+ public boolean isConnected()
+ {
+ return socketChannel_.isConnected();
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof TcpConnection) )
+ return false;
+
+ TcpConnection rhs = (TcpConnection)o;
+ if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
+ return true;
+ else
+ return false;
+ }
+
+ public int hashCode()
+ {
+ return (localEp_ + ":" + remoteEp_).hashCode();
+ }
+
+ public String toString()
+ {
+ return socketChannel_.toString();
+ }
+
+ void closeSocket()
+ {
+ logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
+ if ( pool_ != null )
+ {
+ pool_.removeConnection(this);
+ }
+ cancel(key_);
+ pendingWrites_.clear();
+ }
+
+ void errorClose()
+ {
+ logger_.warn("Closing down connection " + socketChannel_);
+ pendingWrites_.clear();
+ cancel(key_);
+ pendingWrites_.clear();
+ if ( pool_ != null )
+ {
+ pool_.removeConnection(this);
+ }
+ }
+
+ private void cancel(SelectionKey key)
+ {
+ if ( key != null )
+ {
+ key.cancel();
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e) {}
+ }
+ }
+
+ // called in the selector thread
+ public void connect(SelectionKey key)
+ {
+ turnOffInterestOps(key, SelectionKey.OP_CONNECT);
+ try
+ {
+ if (socketChannel_.finishConnect())
+ {
+ turnOnInterestOps(key, SelectionKey.OP_READ);
+
+ synchronized(this)
+ {
+ // this will flush the pending
+ if (!pendingWrites_.isEmpty())
+ {
+ turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+ }
+ }
+ resumeStreaming();
+ }
+ else
+ {
+ logger_.error("Closing connection because socket channel could not finishConnect.");;
+ errorClose();
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.error("Encountered IOException on connection: " + socketChannel_, e);
+ errorClose();
+ }
+ }
+
+ // called in the selector thread
+ public void write(SelectionKey key)
+ {
+ turnOffInterestOps(key, SelectionKey.OP_WRITE);
+ doPendingWrites();
+ /*
+ * This is executed only if we are in streaming mode.
+ * Idea is that we read a chunk of data from a source
+ * and wait to read the next from the source until we
+ * are siganlled to do so from here.
+ */
+ resumeStreaming();
+ }
+
+ void doPendingWrites()
+ {
+ synchronized(this)
+ {
+ try
+ {
+ while(!pendingWrites_.isEmpty())
+ {
+ ByteBuffer buffer = pendingWrites_.peek();
+ socketChannel_.write(buffer);
+ if (buffer.remaining() > 0)
+ {
+ break;
+ }
+ pendingWrites_.remove();
+ }
+
+ }
+ catch(IOException ex)
+ {
+ logger_.error(LogUtil.throwableToString(ex));
+ // This is to fix the wierd Linux bug with NIO.
+ errorClose();
+ }
+ finally
+ {
+ if (!pendingWrites_.isEmpty())
+ {
+ turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+ }
+ }
+ }
+ }
+
+ // called in the selector thread
+ public void read(SelectionKey key)
+ {
+ turnOffInterestOps(key, SelectionKey.OP_READ);
+ // publish this event onto to the TCPReadEvent Queue.
+ MessagingService.getReadExecutor().execute(readWork_);
+ }
+
+ class ReadWorkItem implements Runnable
+ {
+ // called from the TCP READ thread pool
+ public void run()
+ {
+ if ( tcpReader_ == null )
+ {
+ tcpReader_ = new TcpReader(TcpConnection.this);
+ StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolState(tcpReader_);
+ tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
+ }
+ tcpReader_.morphState(nextState);
+ }
+
+ try
+ {
+ byte[] bytes = new byte[0];
+ while ( (bytes = tcpReader_.read()).length > 0 )
+ {
+ ProtocolHeader pH = tcpReader_.getProtocolHeader();
+ if ( !pH.isStreamingMode_ )
+ {
+ /* first message received */
+ if (remoteEp_ == null)
+ {
+ int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.sentinelPort_;
+ remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostAddress(), port );
+ // put connection into pool if possible
+ pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);
+ pool_.addToPool(TcpConnection.this);
+ }
+
+ /* Deserialize and handle the message */
+ MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );
+ tcpReader_.resetState();
+ }
+ else
+ {
+ MessagingService.setStreamingMode(false);
+ /* Close this socket connection used for streaming */
+ closeSocket();
+ }
+ }
+ }
+ catch ( IOException ex )
+ {
+ handleException(ex);
+ }
+ catch ( Throwable th )
+ {
+ handleException(th);
+ }
+ finally
+ {
+ turnOnInterestOps(key_, SelectionKey.OP_READ);
+ }
+ }
+
+ private void handleException(Throwable th)
+ {
+ logger_.warn("Problem reading from socket connected to : " + socketChannel_);
+ logger_.warn(LogUtil.throwableToString(th));
+ // This is to fix the weird Linux bug with NIO.
+ errorClose();
+ }
+ }
+
+ public int pending()
+ {
+ return pendingWrites_.size();
+ }
+
+ public int compareTo(Object o)
+ {
+ if (o instanceof TcpConnection)
+ {
+ return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();
+ }
+
+ throw new IllegalArgumentException();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java Thu Jul 30 15:30:21 2009
@@ -1,60 +1,60 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.nio.channels.*;
-import java.io.IOException;
-import java.net.*;
-
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TcpConnectionHandler extends SelectionKeyHandler
-{
- private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
- EndPoint localEp_;
-
- public TcpConnectionHandler(EndPoint localEp)
- {
- localEp_ = localEp;
- }
-
- public void accept(SelectionKey key)
- {
- try
- {
- ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
- SocketChannel client = serverChannel.accept();
-
- if ( client != null )
- {
- //new TcpConnection(client, localEp_, true);
- TcpConnection.acceptConnection(client, localEp_, true);
- }
- }
- catch(IOException e)
- {
- logger_.warn(LogUtil.throwableToString(e));
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+import java.io.IOException;
+import java.net.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnectionHandler extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
+ EndPoint localEp_;
+
+ public TcpConnectionHandler(EndPoint localEp)
+ {
+ localEp_ = localEp;
+ }
+
+ public void accept(SelectionKey key)
+ {
+ try
+ {
+ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+ SocketChannel client = serverChannel.accept();
+
+ if ( client != null )
+ {
+ //new TcpConnection(client, localEp_, true);
+ TcpConnection.acceptConnection(client, localEp_, true);
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Thu Jul 30 15:30:21 2009
@@ -1,217 +1,217 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import org.apache.log4j.Logger;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class TcpConnectionManager
-{
- private Lock lock_ = new ReentrantLock();
- private List<TcpConnection> allConnections_;
- private EndPoint localEp_;
- private EndPoint remoteEp_;
- private int initialSize_;
- private int growthFactor_;
- private int maxSize_;
- private long lastTimeUsed_;
- private boolean isShut_;
-
- private int inUse_;
-
- TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
- {
- initialSize_ = initialSize;
- growthFactor_ = growthFactor;
- maxSize_ = maxSize;
- localEp_ = localEp;
- remoteEp_ = remoteEp;
- isShut_ = false;
- lastTimeUsed_ = System.currentTimeMillis();
- allConnections_ = new Vector<TcpConnection>();
- }
-
- TcpConnection getConnection() throws IOException
- {
- lock_.lock();
- try
- {
- if (allConnections_.isEmpty())
- {
- TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
- addToPool(conn);
- conn.inUse_ = true;
- incUsed();
- return conn;
- }
-
- TcpConnection least = getLeastLoaded();
-
- if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
- least.inUse_ = true;
- incUsed();
- return least;
- }
-
- TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
- if ( connection != null && !contains(connection) )
- {
- addToPool(connection);
- connection.inUse_ = true;
- incUsed();
- return connection;
- }
- else
- {
- if ( connection != null )
- {
- connection.closeSocket();
- }
- return getLeastLoaded();
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- protected TcpConnection getLeastLoaded()
- {
- TcpConnection connection = null;
- lock_.lock();
- try
- {
- Collections.sort(allConnections_);
- connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
- }
- finally
- {
- lock_.unlock();
- }
- return connection;
- }
-
- void removeConnection(TcpConnection connection)
- {
- allConnections_.remove(connection);
- }
-
- void incUsed()
- {
- inUse_++;
- }
-
- void decUsed()
- {
- inUse_--;
- }
-
- int getConnectionsInUse()
- {
- return inUse_;
- }
-
- void addToPool(TcpConnection connection)
- {
-
- if ( contains(connection) )
- return;
-
- lock_.lock();
- try
- {
- if ( allConnections_.size() < maxSize_ )
- {
- allConnections_.add(connection);
- }
- else
- {
- connection.closeSocket();
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- void shutdown()
- {
- lock_.lock();
- try
- {
- while ( allConnections_.size() > 0 )
- {
- TcpConnection connection = allConnections_.remove(0);
- connection.closeSocket();
- }
- }
- finally
- {
- lock_.unlock();
- }
- isShut_ = true;
- }
-
- int getPoolSize()
- {
- return allConnections_.size();
- }
-
- EndPoint getLocalEndPoint()
- {
- return localEp_;
- }
-
- EndPoint getRemoteEndPoint()
- {
- return remoteEp_;
- }
-
- int getPendingWrites()
- {
- int total = 0;
- lock_.lock();
- try
- {
- for ( TcpConnection connection : allConnections_ )
- {
- total += connection.pending();
- }
- }
- finally
- {
- lock_.unlock();
- }
- return total;
- }
-
- boolean contains(TcpConnection connection)
- {
- return allConnections_.contains(connection);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class TcpConnectionManager
+{
+ private Lock lock_ = new ReentrantLock();
+ private List<TcpConnection> allConnections_;
+ private EndPoint localEp_;
+ private EndPoint remoteEp_;
+ private int initialSize_;
+ private int growthFactor_;
+ private int maxSize_;
+ private long lastTimeUsed_;
+ private boolean isShut_;
+
+ private int inUse_;
+
+ TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+ {
+ initialSize_ = initialSize;
+ growthFactor_ = growthFactor;
+ maxSize_ = maxSize;
+ localEp_ = localEp;
+ remoteEp_ = remoteEp;
+ isShut_ = false;
+ lastTimeUsed_ = System.currentTimeMillis();
+ allConnections_ = new Vector<TcpConnection>();
+ }
+
+ TcpConnection getConnection() throws IOException
+ {
+ lock_.lock();
+ try
+ {
+ if (allConnections_.isEmpty())
+ {
+ TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
+ addToPool(conn);
+ conn.inUse_ = true;
+ incUsed();
+ return conn;
+ }
+
+ TcpConnection least = getLeastLoaded();
+
+ if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
+ least.inUse_ = true;
+ incUsed();
+ return least;
+ }
+
+ TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
+ if ( connection != null && !contains(connection) )
+ {
+ addToPool(connection);
+ connection.inUse_ = true;
+ incUsed();
+ return connection;
+ }
+ else
+ {
+ if ( connection != null )
+ {
+ connection.closeSocket();
+ }
+ return getLeastLoaded();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ protected TcpConnection getLeastLoaded()
+ {
+ TcpConnection connection = null;
+ lock_.lock();
+ try
+ {
+ Collections.sort(allConnections_);
+ connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return connection;
+ }
+
+ void removeConnection(TcpConnection connection)
+ {
+ allConnections_.remove(connection);
+ }
+
+ void incUsed()
+ {
+ inUse_++;
+ }
+
+ void decUsed()
+ {
+ inUse_--;
+ }
+
+ int getConnectionsInUse()
+ {
+ return inUse_;
+ }
+
+ void addToPool(TcpConnection connection)
+ {
+
+ if ( contains(connection) )
+ return;
+
+ lock_.lock();
+ try
+ {
+ if ( allConnections_.size() < maxSize_ )
+ {
+ allConnections_.add(connection);
+ }
+ else
+ {
+ connection.closeSocket();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ void shutdown()
+ {
+ lock_.lock();
+ try
+ {
+ while ( allConnections_.size() > 0 )
+ {
+ TcpConnection connection = allConnections_.remove(0);
+ connection.closeSocket();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ isShut_ = true;
+ }
+
+ int getPoolSize()
+ {
+ return allConnections_.size();
+ }
+
+ EndPoint getLocalEndPoint()
+ {
+ return localEp_;
+ }
+
+ EndPoint getRemoteEndPoint()
+ {
+ return remoteEp_;
+ }
+
+ int getPendingWrites()
+ {
+ int total = 0;
+ lock_.lock();
+ try
+ {
+ for ( TcpConnection connection : allConnections_ )
+ {
+ total += connection.pending();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return total;
+ }
+
+ boolean contains(TcpConnection connection)
+ {
+ return allConnections_.contains(connection);
+ }
+}