You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ed...@apache.org on 2008/07/20 01:19:14 UTC
svn commit: r678238 - in
/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
BufferedWriteFilter.java
Author: edeoliveira
Date: Sat Jul 19 16:19:14 2008
New Revision: 678238
URL: http://svn.apache.org/viewvc?rev=678238&view=rev
Log:
New IoFilter that implements DIRMINA-519
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/
mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java (with props)
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678238&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java Sat Jul 19 16:19:14 2008
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.buffer;
+
+import java.io.BufferedOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.filterchain.IoFilterAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.DefaultWriteRequest;
+import org.apache.mina.core.write.WriteRequest;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+
+/**
+ * An {@link IoFilter} implementation used to buffer outgoing {@link WriteRequest} almost
+ * like what {@link BufferedOutputStream} does. Using this filter allows to be less dependent
+ * from network latency. It is also useful when a session is generating very small messages
+ * too frequently and consequently generating unnecessary traffic overhead.
+ *
+ * Please note that it should always be placed before the {@link ProtocolCodecFilter}
+ * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer} objects.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: $, $Date: $
+ * @since MINA 2.0.0-M2
+ */
+public final class BufferedWriteFilter extends IoFilterAdapter {
+
+ /**
+ * Default buffer size value in bytes.
+ */
+ public final static int DEFAULT_BUFFER_SIZE = 8192;
+
+ /**
+ * The buffer size allocated for each new session's buffer.
+ */
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+
+ /**
+ * The map that matches an {@link IoSession} and it's {@link IoBuffer}
+ * buffer.
+ */
+ protected Map<IoSession, IoBuffer> buffersMap = new HashMap<IoSession, IoBuffer>();
+
+ /**
+ * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
+ * bytes.
+ */
+ public BufferedWriteFilter() {
+ this(DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Constructor which sets buffer size to <code>bufferSize</code>.
+ *
+ * @param bufferSize the new buffer size
+ */
+ public BufferedWriteFilter(int bufferSize) {
+ super();
+ this.bufferSize = bufferSize;
+ }
+
+ /**
+ * Returns buffer size.
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Sets the buffer size but only for the newly created buffers.
+ *
+ * @param bufferSize the new buffer size
+ */
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws Exception if <code>writeRequest.message</code> isn't an
+ * {@link IoBuffer} instance.
+ */
+ @Override
+ public void filterWrite(NextFilter nextFilter, IoSession session,
+ WriteRequest writeRequest) throws Exception {
+
+ Object data = writeRequest.getMessage();
+
+ if (data instanceof IoBuffer) {
+ write(session, (IoBuffer) data);
+ } else {
+ throw new IllegalArgumentException(
+ "This filter should only buffer IoBuffer objects");
+ }
+ }
+
+ /**
+ * Writes an {@link IoBuffer} to the session's buffer.
+ *
+ * @param session the session to which a write is requested
+ * @param data the data to buffer
+ */
+ private void write(IoSession session, IoBuffer data) {
+ IoBuffer dest = null;
+ synchronized (buffersMap) {
+ dest = buffersMap.get(session);
+ if (dest == null) {
+ // Enforce the creation of a non-expandable buffer
+ dest = IoBuffer.allocate(bufferSize).setAutoExpand(false);
+ buffersMap.put(session, dest);
+ }
+ }
+
+ write(session, data, dest);
+ }
+
+ /**
+ * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code>
+ * {@link IoBuffer} which buffers write requests for the
+ * <code>session</code> {@ link IoSession} until buffer is full
+ * or manually flushed.
+ *
+ * @param session the session where buffer will be written
+ * @param data the data to buffer
+ * @param buf the buffer where data will be temporarily written
+ */
+ private void write(IoSession session, IoBuffer data, IoBuffer buf) {
+ synchronized (buf) {
+ try {
+ int len = data.remaining();
+ if (len >= buf.capacity()) {
+ /*
+ * If the request length exceeds the size of the output buffer,
+ * flush the output buffer and then write the data directly.
+ */
+ NextFilter nextFilter = session.getFilterChain()
+ .getNextFilter(this);
+ internalFlush(nextFilter, session, buf);
+ nextFilter.filterWrite(session,
+ new DefaultWriteRequest(buf));
+ return;
+ }
+ if (len > (buf.limit() - buf.position())) {
+ internalFlush(session.getFilterChain().getNextFilter(this),
+ session, buf);
+ }
+ buf.put(data);
+ } catch (Throwable e) {
+ session.getFilterChain().fireExceptionCaught(e);
+ }
+ }
+ }
+
+ /**
+ * Internal method that actually flushes the buffered data.
+ *
+ * @param nextFilter the {@link NextFilter} of this filter
+ * @param session the session where buffer will be written
+ * @param data the data to write
+ * @throws Exception if a write operation fails
+ */
+ private void internalFlush(NextFilter nextFilter, IoSession session,
+ IoBuffer data) throws Exception {
+ if (data != null) {
+ nextFilter.filterWrite(session, new DefaultWriteRequest(data));
+ data.clear();
+ }
+ }
+
+ /**
+ * Flushes the buffered data.
+ *
+ * @param session the session where buffer will be written
+ */
+ public void flush(IoSession session) {
+ try {
+ IoBuffer data = null;
+ synchronized (session) {
+ data = buffersMap.get(session);
+ }
+ internalFlush(session.getFilterChain().getNextFilter(this),
+ session, data);
+
+ } catch (Throwable e) {
+ session.getFilterChain().fireExceptionCaught(e);
+ }
+ }
+
+ /**
+ * Internal method that actually frees the {@link IoBuffer} that contains
+ * the buffered data that has not been flushed.
+ *
+ * @param session the session we operate on
+ */
+ private void clear(IoSession session) {
+ synchronized (session) {
+ IoBuffer buf = buffersMap.remove(session);
+ if (buf != null) {
+ buf.free();
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ Throwable cause) throws Exception {
+ clear(session);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void sessionClosed(NextFilter nextFilter, IoSession session)
+ throws Exception {
+ clear(session);
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Re: svn commit: r678238 - in /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./ BufferedWriteFilter.java
Posted by Mark Webb <el...@gmail.com>.
I have a question on this class.
Why not use ConcurrentHashMap for this instead of synchronizing? Or
CopyOnWriteMap?
On Sat, Jul 19, 2008 at 7:19 PM, <ed...@apache.org> wrote:
> Author: edeoliveira
> Date: Sat Jul 19 16:19:14 2008
> New Revision: 678238
>
> URL: http://svn.apache.org/viewvc?rev=678238&view=rev
> Log:
> New IoFilter that implements DIRMINA-519
>
> Added:
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/
>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> (with props)
>
> Added:
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> URL:
> http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678238&view=auto
>
> ==============================================================================
> ---
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> (added)
> +++
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> Sat Jul 19 16:19:14 2008
> @@ -0,0 +1,243 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + *
> + */
> +package org.apache.mina.filter.buffer;
> +
> +import java.io.BufferedOutputStream;
> +import java.util.HashMap;
> +import java.util.Map;
> +
> +import org.apache.mina.core.buffer.IoBuffer;
> +import org.apache.mina.core.filterchain.IoFilter;
> +import org.apache.mina.core.filterchain.IoFilterAdapter;
> +import org.apache.mina.core.session.IoSession;
> +import org.apache.mina.core.write.DefaultWriteRequest;
> +import org.apache.mina.core.write.WriteRequest;
> +import org.apache.mina.filter.codec.ProtocolCodecFilter;
> +
> +/**
> + * An {@link IoFilter} implementation used to buffer outgoing {@link
> WriteRequest} almost
> + * like what {@link BufferedOutputStream} does. Using this filter allows
> to be less dependent
> + * from network latency. It is also useful when a session is generating
> very small messages
> + * too frequently and consequently generating unnecessary traffic
> overhead.
> + *
> + * Please note that it should always be placed before the {@link
> ProtocolCodecFilter}
> + * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer}
> objects.
> + *
> + * @author The Apache MINA Project (dev@mina.apache.org)
> + * @version $Rev: $, $Date: $
> + * @since MINA 2.0.0-M2
> + */
> +public final class BufferedWriteFilter extends IoFilterAdapter {
> +
> + /**
> + * Default buffer size value in bytes.
> + */
> + public final static int DEFAULT_BUFFER_SIZE = 8192;
> +
> + /**
> + * The buffer size allocated for each new session's buffer.
> + */
> + private int bufferSize = DEFAULT_BUFFER_SIZE;
> +
> + /**
> + * The map that matches an {@link IoSession} and it's {@link IoBuffer}
> + * buffer.
> + */
> + protected Map<IoSession, IoBuffer> buffersMap = new HashMap<IoSession,
> IoBuffer>();
> +
> + /**
> + * Default constructor. Sets buffer size to {@link
> #DEFAULT_BUFFER_SIZE}
> + * bytes.
> + */
> + public BufferedWriteFilter() {
> + this(DEFAULT_BUFFER_SIZE);
> + }
> +
> + /**
> + * Constructor which sets buffer size to <code>bufferSize</code>.
> + *
> + * @param bufferSize the new buffer size
> + */
> + public BufferedWriteFilter(int bufferSize) {
> + super();
> + this.bufferSize = bufferSize;
> + }
> +
> + /**
> + * Returns buffer size.
> + */
> + public int getBufferSize() {
> + return bufferSize;
> + }
> +
> + /**
> + * Sets the buffer size but only for the newly created buffers.
> + *
> + * @param bufferSize the new buffer size
> + */
> + public void setBufferSize(int bufferSize) {
> + this.bufferSize = bufferSize;
> + }
> +
> + /**
> + * {@inheritDoc}
> + *
> + * @throws Exception if <code>writeRequest.message</code> isn't an
> + * {@link IoBuffer} instance.
> + */
> + @Override
> + public void filterWrite(NextFilter nextFilter, IoSession session,
> + WriteRequest writeRequest) throws Exception {
> +
> + Object data = writeRequest.getMessage();
> +
> + if (data instanceof IoBuffer) {
> + write(session, (IoBuffer) data);
> + } else {
> + throw new IllegalArgumentException(
> + "This filter should only buffer IoBuffer objects");
> + }
> + }
> +
> + /**
> + * Writes an {@link IoBuffer} to the session's buffer.
> + *
> + * @param session the session to which a write is requested
> + * @param data the data to buffer
> + */
> + private void write(IoSession session, IoBuffer data) {
> + IoBuffer dest = null;
> + synchronized (buffersMap) {
> + dest = buffersMap.get(session);
> + if (dest == null) {
> + // Enforce the creation of a non-expandable buffer
> + dest = IoBuffer.allocate(bufferSize).setAutoExpand(false);
> + buffersMap.put(session, dest);
> + }
> + }
> +
> + write(session, data, dest);
> + }
> +
> + /**
> + * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code>
> + * {@link IoBuffer} which buffers write requests for the
> + * <code>session</code> {@ link IoSession} until buffer is full
> + * or manually flushed.
> + *
> + * @param session the session where buffer will be written
> + * @param data the data to buffer
> + * @param buf the buffer where data will be temporarily written
> + */
> + private void write(IoSession session, IoBuffer data, IoBuffer buf) {
> + synchronized (buf) {
> + try {
> + int len = data.remaining();
> + if (len >= buf.capacity()) {
> + /*
> + * If the request length exceeds the size of the
> output buffer,
> + * flush the output buffer and then write the data
> directly.
> + */
> + NextFilter nextFilter = session.getFilterChain()
> + .getNextFilter(this);
> + internalFlush(nextFilter, session, buf);
> + nextFilter.filterWrite(session,
> + new DefaultWriteRequest(buf));
> + return;
> + }
> + if (len > (buf.limit() - buf.position())) {
> +
> internalFlush(session.getFilterChain().getNextFilter(this),
> + session, buf);
> + }
> + buf.put(data);
> + } catch (Throwable e) {
> + session.getFilterChain().fireExceptionCaught(e);
> + }
> + }
> + }
> +
> + /**
> + * Internal method that actually flushes the buffered data.
> + *
> + * @param nextFilter the {@link NextFilter} of this filter
> + * @param session the session where buffer will be written
> + * @param data the data to write
> + * @throws Exception if a write operation fails
> + */
> + private void internalFlush(NextFilter nextFilter, IoSession session,
> + IoBuffer data) throws Exception {
> + if (data != null) {
> + nextFilter.filterWrite(session, new
> DefaultWriteRequest(data));
> + data.clear();
> + }
> + }
> +
> + /**
> + * Flushes the buffered data.
> + *
> + * @param session the session where buffer will be written
> + */
> + public void flush(IoSession session) {
> + try {
> + IoBuffer data = null;
> + synchronized (session) {
> + data = buffersMap.get(session);
> + }
> + internalFlush(session.getFilterChain().getNextFilter(this),
> + session, data);
> +
> + } catch (Throwable e) {
> + session.getFilterChain().fireExceptionCaught(e);
> + }
> + }
> +
> + /**
> + * Internal method that actually frees the {@link IoBuffer} that
> contains
> + * the buffered data that has not been flushed.
> + *
> + * @param session the session we operate on
> + */
> + private void clear(IoSession session) {
> + synchronized (session) {
> + IoBuffer buf = buffersMap.remove(session);
> + if (buf != null) {
> + buf.free();
> + }
> + }
> + }
> +
> + /**
> + * {@inheritDoc}
> + */
> + @Override
> + public void exceptionCaught(NextFilter nextFilter, IoSession session,
> + Throwable cause) throws Exception {
> + clear(session);
> + }
> +
> + /**
> + * {@inheritDoc}
> + */
> + @Override
> + public void sessionClosed(NextFilter nextFilter, IoSession session)
> + throws Exception {
> + clear(session);
> + }
> +}
> \ No newline at end of file
>
> Propchange:
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
>
> ------------------------------------------------------------------------------
> svn:eol-style = native
>
> Propchange:
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
>
> ------------------------------------------------------------------------------
> svn:mime-type = text/plain
>
>
>