You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2011/11/09 06:30:47 UTC
svn commit: r1199619 - in /mina/trunk/core/src:
main/java/org/apache/mina/filter/codec/
main/java/org/apache/mina/filter/logging/
main/java/org/apache/mina/filterchain/ main/java/org/apache/mina/service/
main/java/org/apache/mina/session/ main/java/org...
Author: jvermillard
Date: Wed Nov 9 05:30:46 2011
New Revision: 1199619
URL: http://svn.apache.org/viewvc?rev=1199619&view=rev
Log:
added some unit test, doco and started to think about write future and message sent
Added:
mina/trunk/core/src/test/java/org/apache/mina/filter/
mina/trunk/core/src/test/java/org/apache/mina/filter/codec/
mina/trunk/core/src/test/java/org/apache/mina/filter/codec/ProtocolCodecFilterTest.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoder.java
mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterController.java
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFactory.java Wed Nov 9 05:30:46 2011
@@ -32,6 +32,7 @@ import org.apache.mina.api.IoSession;
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface ProtocolCodecFactory {
+
/**
* Returns a new (or reusable) instance of {@link ProtocolEncoder} which
* encodes message objects into binary or protocol-specific data.
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Wed Nov 9 05:30:46 2011
@@ -21,7 +21,7 @@ package org.apache.mina.filter.codec;
import java.nio.ByteBuffer;
-import org.apache.mina.api.IdleStatus;
+import org.apache.mina.api.DefaultIoFilter;
import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoSession;
import org.apache.mina.filterchain.ReadFilterChainController;
@@ -36,14 +36,16 @@ import org.slf4j.LoggerFactory;
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public class ProtocolCodecFilter implements IoFilter {
+public class ProtocolCodecFilter extends DefaultIoFilter {
/** A logger for this class */
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
private static final Class<?>[] EMPTY_PARAMS = new Class[0];
+ /** key for session attribute holding the encoder */
private final String ENCODER = ProtocolCodecFilter.class.getSimpleName() + "encoder";
+ /** key for session attribute holding the decoder */
private final String DECODER = ProtocolCodecFilter.class.getSimpleName() + "decoder";
/** The factory responsible for creating the encoder and decoder */
@@ -76,7 +78,7 @@ public class ProtocolCodecFilter impleme
if (encoder == null) {
throw new IllegalArgumentException("encoder");
}
-
+
if (decoder == null) {
throw new IllegalArgumentException("decoder");
}
@@ -95,7 +97,7 @@ public class ProtocolCodecFilter impleme
/**
* Creates a new instance of ProtocolCodecFilter, without any factory.
- * The encoder/decoder factory will be created as an inner class, using
+ * The encoder/decoder factory will be created as an anonymous class, using
* the two parameters (encoder and decoder), which are class names. Instances
* for those classes will be created in this constructor.
*
@@ -107,25 +109,25 @@ public class ProtocolCodecFilter impleme
if (encoderClass == null) {
throw new IllegalArgumentException("encoderClass");
}
-
+
if (decoderClass == null) {
throw new IllegalArgumentException("decoderClass");
}
-
+
if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
}
-
+
if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
}
-
+
try {
encoderClass.getConstructor(EMPTY_PARAMS);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
}
-
+
try {
decoderClass.getConstructor(EMPTY_PARAMS);
} catch (NoSuchMethodException e) {
@@ -184,17 +186,17 @@ public class ProtocolCodecFilter impleme
* Process the incoming message, calling the session decoder. As the incoming
* buffer might contains more than one messages, we have to loop until the decoder
* throws an exception.
- *
+ * <code>
* while ( buffer not empty )
* try
* decode ( buffer )
* catch
* break;
- *
+ * </code>
*/
@Override
public void messageReceived(IoSession session, Object message, ReadFilterChainController controller) {
- LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
+ LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session);
if (!(message instanceof ByteBuffer)) {
controller.callReadNextFilter(session, message);
@@ -214,13 +216,21 @@ public class ProtocolCodecFilter impleme
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void messageWriting(IoSession session, Object message, WriteFilterChainController controller) {
+ LOGGER.debug("Processing a MESSAGE_WRITTING for session {}", session);
+
ProtocolEncoder encoder = session.getAttribute(ENCODER);
encoder.encode(session, message, controller);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void sessionCreated(IoSession session) {
// Initialize the encoder and decoder if we use a factory
@@ -232,21 +242,14 @@ public class ProtocolCodecFilter impleme
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void sessionClosed(IoSession session) {
disposeCodec(session);
}
- @Override
- public void sessionOpened(IoSession session) {
- // Nothing to do
- }
-
- @Override
- public void sessionIdle(IoSession session, IdleStatus status) {
- // Nothing to do
- }
-
//----------- Helper methods ---------------------------------------------
/**
* Dispose the encoder, decoder, and the callback for the decoded
@@ -295,4 +298,4 @@ public class ProtocolCodecFilter impleme
LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
}
}
-}
+}
\ No newline at end of file
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoder.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoder.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoder.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoder.java Wed Nov 9 05:30:46 2011
@@ -40,6 +40,7 @@ import org.apache.mina.filterchain.ReadF
* @see ProtocolDecoderException
*/
public interface ProtocolDecoder {
+
/**
* Decodes binary or protocol-specific content into higher-level message objects.
* MINA invokes {@link #decode(IoSession, IoBuffer, ProtocolDecoderOutput)}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java Wed Nov 9 05:30:46 2011
@@ -151,27 +151,42 @@ public class LoggingFilter implements Io
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void sessionCreated(IoSession session) {
log(sessionCreatedLevel, "CREATED");
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void sessionOpened(IoSession session) {
log(sessionOpenedLevel, "OPENED");
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void sessionClosed(IoSession session) {
log(sessionClosedLevel, "CLOSED");
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log(sessionIdleLevel, "IDLE");
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void messageReceived(IoSession session, Object message, ReadFilterChainController controller) {
if (message instanceof ByteBuffer) {
@@ -183,6 +198,9 @@ public class LoggingFilter implements Io
controller.callReadNextFilter(session, message);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void messageWriting(IoSession session, Object message, WriteFilterChainController controller) {
if (message instanceof ByteBuffer) {
@@ -304,4 +322,4 @@ public class LoggingFilter implements Io
public LogLevel getSessionClosedLogLevel() {
return sessionClosedLevel;
}
-}
+}
\ No newline at end of file
Modified: mina/trunk/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterController.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterController.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterController.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterController.java Wed Nov 9 05:30:46 2011
@@ -19,12 +19,19 @@
*/
package org.apache.mina.filterchain;
+import java.nio.ByteBuffer;
+
import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * The default implementation of the {@link IoFilterController}
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
public class DefaultIoFilterController implements IoFilterController, ReadFilterChainController,
WriteFilterChainController {
@@ -45,14 +52,20 @@ public class DefaultIoFilterController i
this.chain = chain;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void processSessionCreated(IoSession session) {
- LOG.debug("processing session created event");
+ LOG.debug("processing session created event for session {}", session);
for (IoFilter filter : chain) {
filter.sessionCreated(session);
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void processSessionOpened(IoSession session) {
LOG.debug("processing session open event");
@@ -61,6 +74,9 @@ public class DefaultIoFilterController i
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void processSessionClosed(IoSession session) {
LOG.debug("processing session closed event");
@@ -69,8 +85,12 @@ public class DefaultIoFilterController i
}
}
+ /** the current position in the read chain for this thread */
private static final ThreadLocal<Integer> readChainPosition = new ThreadLocal<Integer>();
+ /**
+ * {@inheritDoc}
+ */
@Override
public void processMessageReceived(IoSession session, Object message) {
LOG.debug("processing message '{}' received event ", message);
@@ -83,14 +103,24 @@ public class DefaultIoFilterController i
}
}
+ /** the current position n the write chain for this thread */
private static final ThreadLocal<Integer> writeChainPosition = new ThreadLocal<Integer>();
+ /** old he currently encoded high level message, will be used for messageSent event */
+ private static final ThreadLocal<Object> writtenMessage = new ThreadLocal<Object>();
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public void processMessageWriting(IoSession session, Object message) {
LOG.debug("processing message '{}' writing event ", message);
+
+ // save the high level message before chain of transformation
+ writtenMessage.set(message);
+
if (chain.length < 1) {
- LOG.debug("Nothing to do, the chain is empty, we just enqueue the message");
- session.enqueueWriteRequest(message);
+ enqueueFinalWriteMessage(session, message);
} else {
writeChainPosition.set(chain.length - 1);
@@ -99,6 +129,9 @@ public class DefaultIoFilterController i
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void callWriteNextFilter(IoSession session, Object message) {
if (LOG.isDebugEnabled()) {
@@ -108,14 +141,25 @@ public class DefaultIoFilterController i
writeChainPosition.set(writeChainPosition.get() - 1);
if (writeChainPosition.get() < 0 || chain.length == 0) {
// end of chain processing
- LOG.debug("end of write chan we enqueue the message in the session : {}", message);
- session.enqueueWriteRequest(message);
+ enqueueFinalWriteMessage(session, message);
} else {
chain[writeChainPosition.get()].messageWriting(session, message, this);
}
writeChainPosition.set(writeChainPosition.get() + 1);
}
+ /**
+ * At the end of write chain processing, enqueue final encoded {@link ByteBuffer} message in the session
+ */
+ private void enqueueFinalWriteMessage(IoSession session, Object message) {
+ LOG.debug("end of write chan we enqueue the message in the session : {}", message);
+ session.enqueueWriteRequest(message);
+ // TODO : fire message written and handle the close the write future (if any)
+ }
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public void callReadNextFilter(IoSession session, Object message) {
readChainPosition.set(readChainPosition.get() + 1);
@@ -127,6 +171,9 @@ public class DefaultIoFilterController i
readChainPosition.set(readChainPosition.get() - 1);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public String toString() {
StringBuilder bldr = new StringBuilder("IoFilterChain {");
Modified: mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java Wed Nov 9 05:30:46 2011
@@ -34,30 +34,30 @@ import org.apache.mina.api.IoSession;
*
*/
public interface SelectorProcessor {
-
+
/**
* create a session for a freshly accepted client socket
* @param service
* @param clientChannel
*/
- void createSession(IoService service,Object clientSocket);
-
+ void createSession(IoService service, Object clientSocket);
+
/**
* Bind and start processing this new server address
* @param address local address to bind
* @throws IOException exception thrown if any problem occurs while binding
*/
void bindAndAcceptAddress(IoServer server, SocketAddress address) throws IOException;
-
+
/**
* Stop processing and unbind this server address
* @param address the local server address to unbind
* @throws IOException exception thrown if any problem occurs while unbinding
*/
void unbind(SocketAddress address) throws IOException;
-
+
/**
- * Schedule a session for flushing, to be called after a session write.
+ * Schedule a session for flushing, will be called after a {@link IoSession#write(Object)}.
* @param session the session to flush
*/
void flush(IoSession session);
Modified: mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Wed Nov 9 05:30:46 2011
@@ -248,6 +248,9 @@ public abstract class AbstractIoSession
registeredForWrite.set(false);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public IoFuture<Void> writeWithFuture(Object message) {
write(message);
@@ -255,11 +258,17 @@ public abstract class AbstractIoSession
return null;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Queue<WriteRequest> getWriteQueue() {
return writeQueue;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public IoFilterController getFilterChain() {
return filterProcessor;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1199619&r1=1199618&r2=1199619&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Wed Nov 9 05:30:46 2011
@@ -336,7 +336,9 @@ public class NioSelectorProcessor implem
}
if (key.isWritable()) {
- LOGGER.debug("writable session : {}", key.attachment());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("writable session : {}", key.attachment());
+ }
NioTcpSession session = (NioTcpSession) key.attachment();
session.setNotRegisteredForWrite();
// write from the session write queue
@@ -348,8 +350,12 @@ public class NioSelectorProcessor implem
break;
}
ByteBuffer buf = (ByteBuffer) wreq.getMessage();
- int wrote = session.getSocketChannel().write(buf);
- LOGGER.debug("wrote {} bytes to {}", wrote, session);
+
+ if (LOGGER.isDebugEnabled()) {
+ int wrote = session.getSocketChannel().write(buf);
+ LOGGER.debug("wrote {} bytes to {}", wrote, session);
+ }
+
if (buf.remaining() == 0) {
// completed write request, let's remove
// it
@@ -433,6 +439,9 @@ public class NioSelectorProcessor implem
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void flush(IoSession session) {
LOGGER.debug("scheduling session {} for writing", session.toString());
Added: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/ProtocolCodecFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/ProtocolCodecFilterTest.java?rev=1199619&view=auto
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/ProtocolCodecFilterTest.java (added)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/ProtocolCodecFilterTest.java Wed Nov 9 05:30:46 2011
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.filter.codec;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import org.apache.mina.api.IoSession;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link ProtocolCodecFilter}
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class ProtocolCodecFilterTest {
+
+ @Test
+ public void constructor_args() {
+ try {
+ new ProtocolCodecFilter(null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // happy
+ } catch (Exception e2) {
+ fail();
+ }
+
+ try {
+ new ProtocolCodecFilter(null, mock(ProtocolDecoder.class));
+ fail();
+ } catch (IllegalArgumentException e) {
+ // happy
+ } catch (Exception e2) {
+ fail();
+ }
+
+ try {
+ new ProtocolCodecFilter(mock(ProtocolEncoder.class), null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // happy
+ } catch (Exception e2) {
+ fail();
+ }
+
+ try {
+ new ProtocolCodecFilter(null, ProtocolDecoder.class);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // happy
+ } catch (Exception e2) {
+ fail();
+ }
+
+ try {
+ new ProtocolCodecFilter(ProtocolEncoder.class, null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // happy
+ } catch (Exception e2) {
+ fail();
+ }
+ }
+
+ @Test
+ public void codec_factory() {
+ ProtocolEncoder encoder = mock(ProtocolEncoder.class);
+ ProtocolDecoder decoder = mock(ProtocolDecoder.class);
+
+ ProtocolCodecFactory factory = mock(ProtocolCodecFactory.class);
+ when(factory.getEncoder(any(IoSession.class))).thenReturn(encoder);
+ when(factory.getDecoder(any(IoSession.class))).thenReturn(decoder);
+
+ ProtocolCodecFilter codec = new ProtocolCodecFilter(factory);
+ assertEquals(encoder, codec.getEncoder(null));
+ assertEquals(decoder, codec.getDecoder(null));
+
+ codec = new ProtocolCodecFilter(encoder, decoder);
+ assertEquals(encoder, codec.getEncoder(null));
+ assertEquals(decoder, codec.getDecoder(null));
+ }
+
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/ProtocolCodecFilterTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain