You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by pr...@apache.org on 2007/09/02 19:01:53 UTC
svn commit: r572034 - in /mina/branches/1.1/core/src:
main/java/org/apache/mina/transport/vmpipe/
main/java/org/apache/mina/transport/vmpipe/support/
test/java/org/apache/mina/transport/vmpipe/
Author: proyal
Date: Sun Sep 2 10:01:53 2007
New Revision: 572034
URL: http://svn.apache.org/viewvc?rev=572034&view=rev
Log:
DIRMINA-431 - Test and fixes. Keep track of when the session is opened in VmPipeFC to ensure proper event ordering. Use a re-entrant lock, so we can try and lock, and put in data queue if we can't.
Added:
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (with props)
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=572034&r1=572033&r2=572034&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Sun Sep 2 10:01:53 2007
@@ -88,9 +88,11 @@
}
DefaultConnectFuture future = new DefaultConnectFuture();
- VmPipeSessionImpl localSession = new VmPipeSessionImpl(this, config,
- getListeners(), new Object(), // lock
- new AnonymousSocketAddress(), handler, entry);
+ VmPipeSessionImpl localSession = new VmPipeSessionImpl(this,
+ config,getListeners(),
+ new AnonymousSocketAddress(),
+ handler,
+ entry);
// initialize connector session
try {
@@ -100,8 +102,7 @@
config.getThreadModel().buildFilterChain(filterChain);
// The following sentences don't throw any exceptions.
- localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
- future);
+ localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, future);
getListeners().fireSessionCreated(localSession);
VmPipeIdleStatusChecker.getInstance().addSession(localSession);
} catch (Throwable t) {
@@ -127,6 +128,9 @@
remoteSession.close();
}
+
+ // Start chains, and then allow and messages read/written to be processed. This is to ensure that
+ // sessionOpened gets received before a messageReceived
((VmPipeFilterChain) localSession.getFilterChain()).start();
((VmPipeFilterChain) remoteSession.getFilterChain()).start();
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=572034&r1=572033&r2=572034&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Sun Sep 2 10:01:53 2007
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe.support;
@@ -36,7 +36,8 @@
private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
- private boolean flushEnabled;
+ private volatile boolean flushEnabled;
+ private volatile boolean sessionOpened;
public VmPipeFilterChain(IoSession session) {
super(session);
@@ -45,11 +46,12 @@
public void start() {
flushEnabled = true;
flushEvents();
+ flushPendingDataQueues( (VmPipeSessionImpl) getSession() );
}
private void pushEvent(Event e) {
eventQueue.offer(e);
- if (flushEnabled) {
+ if ( flushEnabled ) {
flushEvents();
}
}
@@ -68,10 +70,9 @@
if (type == EventType.RECEIVED) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
- if (!s.getTrafficMask().isReadable()) {
- s.pendingDataQueue.offer(data);
- } else {
+
+ if( sessionOpened && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
+ try {
int byteCount = 1;
if (data instanceof ByteBuffer) {
byteCount = ((ByteBuffer) data).remaining();
@@ -80,7 +81,13 @@
s.increaseReadBytes(byteCount);
super.fireMessageReceived(s, data);
+ } finally {
+ s.getLock().unlock();
}
+
+ flushPendingDataQueues( s );
+ } else {
+ s.pendingDataQueue.add(data);
}
} else if (type == EventType.WRITE) {
super.fireFilterWrite(session, (WriteRequest) data);
@@ -92,6 +99,7 @@
super.fireSessionIdle(session, (IdleStatus) data);
} else if (type == EventType.OPENED) {
super.fireSessionOpened(session);
+ sessionOpened = true;
} else if (type == EventType.CREATED) {
super.fireSessionCreated(session);
} else if (type == EventType.CLOSED) {
@@ -101,6 +109,11 @@
}
}
+ private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
+ s.updateTrafficMask();
+ s.getRemoteSession().updateTrafficMask();
+ }
+
@Override
public void fireFilterClose(IoSession session) {
pushEvent(new Event(EventType.CLOSE, null));
@@ -149,12 +162,9 @@
@Override
protected void doWrite(IoSession session, WriteRequest writeRequest) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
- if (s.isConnected()) {
-
- if (!s.getTrafficMask().isWritable()) {
- s.pendingDataQueue.offer(writeRequest);
- } else {
+ if (s.isConnected()) {
+ if ( s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
+ try {
Object message = writeRequest.getMessage();
int byteCount = 1;
@@ -174,23 +184,34 @@
s.increaseWrittenMessages();
s.getRemoteSession().getFilterChain().fireMessageReceived(
- s.getRemoteSession(), messageCopy);
+ s.getRemoteSession(), messageCopy);
s.getFilterChain().fireMessageSent(s, writeRequest);
+ } finally {
+ s.getLock().unlock();
}
+
+ flushPendingDataQueues( s );
} else {
- writeRequest.getFuture().setWritten(false);
+ s.pendingDataQueue.add(writeRequest);
}
+ } else {
+ writeRequest.getFuture().setWritten(false);
}
}
@Override
protected void doClose(IoSession session) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
+
+ try {
+ s.getLock().lock();
+
if (!session.getCloseFuture().isClosed()) {
s.getServiceListeners().fireSessionDestroyed(s);
s.getRemoteSession().close();
}
+ } finally {
+ s.getLock().unlock();
}
}
@@ -220,6 +241,7 @@
this.value = value;
}
+ @Override
public String toString() {
return value;
}
@@ -230,7 +252,7 @@
private final Object data;
- public Event(EventType type, Object data) {
+ private Event(EventType type, Object data) {
this.type = type;
this.data = data;
}
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=572034&r1=572033&r2=572034&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Sun Sep 2 10:01:53 2007
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
@@ -65,20 +67,20 @@
private final VmPipeSessionImpl remoteSession;
- final Object lock;
+ private final Lock lock;
final BlockingQueue<Object> pendingDataQueue;
- /**
+ /*
* Constructor for client-side session.
*/
- public VmPipeSessionImpl(IoService service, IoServiceConfig serviceConfig,
- IoServiceListenerSupport serviceListeners, Object lock,
- SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry) {
+ public VmPipeSessionImpl( IoService service, IoServiceConfig serviceConfig,
+ IoServiceListenerSupport serviceListeners,
+ SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry ) {
this.service = service;
this.serviceConfig = serviceConfig;
this.serviceListeners = serviceListeners;
- this.lock = lock;
+ this.lock = new ReentrantLock();
this.localAddress = localAddress;
this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
this.handler = handler;
@@ -88,7 +90,7 @@
remoteSession = new VmPipeSessionImpl(this, remoteEntry);
}
- /**
+ /*
* Constructor for server-side session.
*/
private VmPipeSessionImpl(VmPipeSessionImpl remoteSession, VmPipe entry) {
@@ -188,5 +190,9 @@
}
}
}
+ }
+
+ Lock getLock() {
+ return lock;
}
}
Added: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java?rev=572034&view=auto
==============================================================================
--- mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (added)
+++ mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java Sun Sep 2 10:01:53 2007
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+import junit.framework.TestCase;
+
+/**
+ * @author Apache Mina Project (dev@mina.apache.org)
+ * @version $Rev: $, $Date: $
+ */
+public class VmPipeSessionCrossCommunicationTest extends TestCase {
+ public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
+ final VmPipeAddress address = new VmPipeAddress( 1 );
+ final IoConnector connector = new VmPipeConnector();
+ final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
+ final CountDownLatch latch = new CountDownLatch( 1 );
+ final CountDownLatch messageCount = new CountDownLatch( 2 );
+ IoAcceptor acceptor = new VmPipeAcceptor();
+
+ acceptor.bind( address, new IoHandlerAdapter() {
+ @Override
+ public void messageReceived( IoSession session, Object message ) throws Exception {
+ System.out.println( Thread.currentThread().getName() + ": " + message );
+
+ if ( "start".equals( message ) ) {
+ session.write( "open new" );
+ } else if ( "re-use c1".equals( message ) ) {
+ session.write( "tell me something on c1 now" );
+ } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
+ messageCount.countDown();
+ } else {
+ fail( "unexpected message received " + message );
+ }
+ }
+ } );
+
+ connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+
+ ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
+ @Override
+ public void messageReceived( IoSession session, Object message ) throws Exception {
+ System.out.println( Thread.currentThread().getName() + ": " + message );
+
+ if ( "open new".equals( message ) ) {
+ System.out.println( "opening c2 from " + Thread.currentThread().getName() );
+
+ ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
+ @Override
+ public void sessionOpened( IoSession session ) throws Exception {
+ session.write( "re-use c1" );
+ }
+
+ @Override
+ public void messageReceived( IoSession session, Object message ) throws Exception {
+ System.out.println( Thread.currentThread().getName() + ": " + message );
+
+ if ( "tell me something on c1 now".equals( message ) ) {
+ latch.countDown();
+ c1.get().write( "please don't deadlock via c1" );
+ } else {
+ fail( "unexpected message received " + message );
+ }
+ }
+ } );
+
+ c2Future.join();
+
+ latch.await();
+
+ c2Future.getSession().write( "please don't deadlock via c2" );
+ } else {
+ fail( "unexpeced message received " + message );
+ }
+ }
+ } );
+
+ future.join();
+
+ c1.set( future.getSession() );
+ c1.get().write( "start" );
+
+ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+ while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
+ long[] threads = threadMXBean.findMonitorDeadlockedThreads();
+
+ if ( null != threads ) {
+ StringBuffer sb = new StringBuffer( 256 );
+ ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
+
+ for ( ThreadInfo info : infos ) {
+ sb.append( info.getThreadName() )
+ .append( " blocked on " )
+ .append( info.getLockName() )
+ .append( " owned by " )
+ .append( info.getLockOwnerName() )
+ .append( "\n" );
+ }
+
+ for ( ThreadInfo info : infos ) {
+ sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
+ for ( StackTraceElement element : info.getStackTrace() ) {
+ sb.append( "\t" ).append( element ).append( "\n" );
+ }
+ }
+
+ fail( "deadlocked! \n" + sb );
+ }
+ }
+
+ ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
+ acceptor.unbindAll();
+ }
+}
Propchange: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
------------------------------------------------------------------------------
svn:eol-style = native