You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by pr...@apache.org on 2007/09/04 06:47:06 UTC
svn commit: r572516 - in /mina/trunk/core/src:
main/java/org/apache/mina/transport/vmpipe/
test/java/org/apache/mina/transport/vmpipe/
Author: proyal
Date: Mon Sep 3 21:47:05 2007
New Revision: 572516
URL: http://svn.apache.org/viewvc?rev=572516&view=rev
Log:
port forward r572034 from 1.1 branch. fixes DIRMINA-431
Added:
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
- copied, changed from r572034, mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=572516&r1=572515&r2=572516&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Sep 3 21:47:05 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe;
@@ -24,22 +24,22 @@
import java.util.HashSet;
import java.util.Set;
-import org.apache.mina.common.AbstractIoFilterChain;
import org.apache.mina.common.AbstractIoConnector;
+import org.apache.mina.common.AbstractIoFilterChain;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatusChecker;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoFuture;
import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IdleStatusChecker;
import org.apache.mina.common.TransportMetadata;
/**
* Connects to {@link IoHandler}s which is bound on the specified
* {@link VmPipeAddress}.
- *
+ *
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
@@ -63,7 +63,7 @@
@Override
protected ConnectFuture doConnect(SocketAddress remoteAddress,
- SocketAddress localAddress) {
+ SocketAddress localAddress) {
VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
if (entry == null) {
return DefaultConnectFuture.newFailedFuture(new IOException(
@@ -92,8 +92,7 @@
this.getFilterChainBuilder().buildFilterChain(filterChain);
// The following sentences don't throw any exceptions.
- localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
- future);
+ localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, future);
getListeners().fireSessionCreated(localSession);
IdleStatusChecker.getInstance().addSession(localSession);
} catch (Throwable t) {
@@ -116,6 +115,8 @@
remoteSession.close();
}
+ // Start chains, and then allow and messages read/written to be processed. This is to ensure that
+ // sessionOpened gets received before a messageReceived
((VmPipeFilterChain) localSession.getFilterChain()).start();
((VmPipeFilterChain) remoteSession.getFilterChain()).start();
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=572516&r1=572515&r2=572516&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Mon Sep 3 21:47:05 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe;
@@ -36,7 +36,8 @@
private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
- private boolean flushEnabled;
+ private volatile boolean flushEnabled;
+ private volatile boolean sessionOpened;
VmPipeFilterChain(IoSession session) {
super(session);
@@ -45,6 +46,7 @@
public void start() {
flushEnabled = true;
flushEvents();
+ flushPendingDataQueues((VmPipeSessionImpl) getSession());
}
private void pushEvent(Event e) {
@@ -68,19 +70,25 @@
if (type == EventType.RECEIVED) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
- if (!s.getTrafficMask().isReadable()) {
- s.pendingDataQueue.offer(data);
- } else {
- int byteCount = 1;
- if (data instanceof ByteBuffer) {
- byteCount = ((ByteBuffer) data).remaining();
- }
+ if (sessionOpened && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
+ try {
+ if (!s.getTrafficMask().isReadable()) {
+ s.pendingDataQueue.offer(data);
+ } else {
+ int byteCount = 1;
+ if (data instanceof ByteBuffer) {
+ byteCount = ((ByteBuffer) data).remaining();
+ }
- s.increaseReadBytes(byteCount);
+ s.increaseReadBytes(byteCount);
- super.fireMessageReceived(s, data);
+ super.fireMessageReceived(s, data);
+ }
+ } finally {
+ s.getLock().unlock();
}
+ } else {
+ s.pendingDataQueue.add(data);
}
} else if (type == EventType.WRITE) {
super.fireFilterWrite(session, (WriteRequest) data);
@@ -92,6 +100,7 @@
super.fireSessionIdle(session, (IdleStatus) data);
} else if (type == EventType.OPENED) {
super.fireSessionOpened(session);
+ sessionOpened = true;
} else if (type == EventType.CREATED) {
super.fireSessionCreated(session);
} else if (type == EventType.CLOSED) {
@@ -101,6 +110,11 @@
}
}
+ private static void flushPendingDataQueues(VmPipeSessionImpl s) {
+ s.updateTrafficMask();
+ s.getRemoteSession().updateTrafficMask();
+ }
+
@Override
public void fireFilterClose(IoSession session) {
pushEvent(new Event(EventType.CLOSE, null));
@@ -149,12 +163,10 @@
@Override
protected void doWrite(IoSession session, WriteRequest writeRequest) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
- if (s.isConnected()) {
+ if (s.isConnected()) {
- if (!s.getTrafficMask().isWritable()) {
- s.pendingDataQueue.offer(writeRequest);
- } else {
+ if (s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
+ try {
Object message = writeRequest.getMessage();
int byteCount = 1;
@@ -176,21 +188,30 @@
s.getRemoteSession().getFilterChain().fireMessageReceived(
s.getRemoteSession(), messageCopy);
s.getFilterChain().fireMessageSent(s, writeRequest);
+ } finally {
+ s.getLock().unlock();
}
+
+ flushPendingDataQueues(s);
} else {
- writeRequest.getFuture().setWritten(false);
+ s.pendingDataQueue.offer(writeRequest);
}
+ } else {
+ writeRequest.getFuture().setWritten(false);
}
}
@Override
protected void doClose(IoSession session) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
+ try {
+ s.getLock().lock();
if (!session.getCloseFuture().isClosed()) {
s.getServiceListeners().fireSessionDestroyed(s);
s.getRemoteSession().close();
}
+ } finally {
+ s.getLock().unlock();
}
}
@@ -231,7 +252,7 @@
private final Object data;
- public Event(EventType type, Object data) {
+ private Event(EventType type, Object data) {
this.type = type;
this.data = data;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=572516&r1=572515&r2=572516&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Mon Sep 3 21:47:05 2007
@@ -23,6 +23,8 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.DefaultTransportMetadata;
@@ -67,7 +69,7 @@
private final VmPipeSessionImpl remoteSession;
- final Object lock;
+ private final Lock lock;
final BlockingQueue<Object> pendingDataQueue;
@@ -79,7 +81,7 @@
VmPipeAddress localAddress, IoHandler handler, VmPipe remoteEntry) {
this.service = service;
this.serviceListeners = serviceListeners;
- this.lock = new Object();
+ this.lock = new ReentrantLock();
this.localAddress = localAddress;
this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
this.handler = handler;
@@ -185,5 +187,9 @@
}
}
}
+ }
+
+ Lock getLock() {
+ return lock;
}
}
Copied: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (from r572034, mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java)
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java?p2=mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java&p1=mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java&r1=572034&r2=572516&rev=572516&view=diff
==============================================================================
--- mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java Mon Sep 3 21:47:05 2007
@@ -25,15 +25,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import junit.framework.TestCase;
+
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ThreadModel;
-
-import junit.framework.TestCase;
/**
* @author Apache Mina Project (dev@mina.apache.org)
@@ -41,105 +39,110 @@
*/
public class VmPipeSessionCrossCommunicationTest extends TestCase {
public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
- final VmPipeAddress address = new VmPipeAddress( 1 );
+ final VmPipeAddress address = new VmPipeAddress(1);
final IoConnector connector = new VmPipeConnector();
final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
- final CountDownLatch latch = new CountDownLatch( 1 );
- final CountDownLatch messageCount = new CountDownLatch( 2 );
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch messageCount = new CountDownLatch(2);
IoAcceptor acceptor = new VmPipeAcceptor();
- acceptor.bind( address, new IoHandlerAdapter() {
+ acceptor.setHandler(new IoHandlerAdapter() {
@Override
- public void messageReceived( IoSession session, Object message ) throws Exception {
- System.out.println( Thread.currentThread().getName() + ": " + message );
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ System.out.println(Thread.currentThread().getName() + ": " + message);
- if ( "start".equals( message ) ) {
- session.write( "open new" );
- } else if ( "re-use c1".equals( message ) ) {
- session.write( "tell me something on c1 now" );
- } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
+ if ("start".equals(message)) {
+ session.write("open new");
+ } else if ("re-use c1".equals(message)) {
+ session.write("tell me something on c1 now");
+ } else if (((String) message).startsWith("please don't deadlock")) {
messageCount.countDown();
} else {
- fail( "unexpected message received " + message );
+ fail("unexpected message received " + message);
}
}
- } );
+ });
+ acceptor.setLocalAddress(address);
+ acceptor.bind();
- connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
-
- ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
+ connector.setHandler(new IoHandlerAdapter() {
@Override
- public void messageReceived( IoSession session, Object message ) throws Exception {
- System.out.println( Thread.currentThread().getName() + ": " + message );
-
- if ( "open new".equals( message ) ) {
- System.out.println( "opening c2 from " + Thread.currentThread().getName() );
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ System.out.println(Thread.currentThread().getName() + ": " + message);
+
+ if ("open new".equals(message)) {
+ System.out.println("opening c2 from " + Thread.currentThread().getName());
- ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
+ IoConnector c2 = new VmPipeConnector();
+ c2.setHandler(new IoHandlerAdapter() {
@Override
- public void sessionOpened( IoSession session ) throws Exception {
- session.write( "re-use c1" );
+ public void sessionOpened(IoSession session) throws Exception {
+ session.write("re-use c1");
}
@Override
- public void messageReceived( IoSession session, Object message ) throws Exception {
- System.out.println( Thread.currentThread().getName() + ": " + message );
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ System.out.println(Thread.currentThread().getName() + ": " + message);
- if ( "tell me something on c1 now".equals( message ) ) {
+ if ("tell me something on c1 now".equals(message)) {
latch.countDown();
- c1.get().write( "please don't deadlock via c1" );
+ c1.get().write("please don't deadlock via c1");
} else {
- fail( "unexpected message received " + message );
+ fail("unexpected message received " + message);
}
}
- } );
+ });
- c2Future.join();
+ ConnectFuture c2Future = c2.connect(address);
+
+ c2Future.await();
latch.await();
- c2Future.getSession().write( "please don't deadlock via c2" );
+ c2Future.getSession().write("please don't deadlock via c2");
} else {
- fail( "unexpeced message received " + message );
+ fail("unexpeced message received " + message);
}
}
- } );
+ });
+
+ ConnectFuture future = connector.connect(address);
- future.join();
+ future.await();
- c1.set( future.getSession() );
- c1.get().write( "start" );
+ c1.set(future.getSession());
+ c1.get().write("start");
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
- while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
+ while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
long[] threads = threadMXBean.findMonitorDeadlockedThreads();
- if ( null != threads ) {
- StringBuffer sb = new StringBuffer( 256 );
- ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
-
- for ( ThreadInfo info : infos ) {
- sb.append( info.getThreadName() )
- .append( " blocked on " )
- .append( info.getLockName() )
- .append( " owned by " )
- .append( info.getLockOwnerName() )
- .append( "\n" );
+ if (null != threads) {
+ StringBuffer sb = new StringBuffer(256);
+ ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
+
+ for (ThreadInfo info : infos) {
+ sb.append(info.getThreadName())
+ .append(" blocked on ")
+ .append(info.getLockName())
+ .append(" owned by ")
+ .append(info.getLockOwnerName())
+ .append("\n");
}
- for ( ThreadInfo info : infos ) {
- sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
- for ( StackTraceElement element : info.getStackTrace() ) {
- sb.append( "\t" ).append( element ).append( "\n" );
+ for (ThreadInfo info : infos) {
+ sb.append("\nStack for ").append(info.getThreadName()).append("\n");
+ for (StackTraceElement element : info.getStackTrace()) {
+ sb.append("\t").append(element).append("\n");
}
}
- fail( "deadlocked! \n" + sb );
+ fail("deadlocked! \n" + sb);
}
}
- ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
- acceptor.unbindAll();
+ acceptor.setDisconnectOnUnbind(false);
+ acceptor.unbind();
}
}