You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by be...@apache.org on 2007/10/06 15:54:54 UTC

svn commit: r582495 - in /labs/vysper/src: main/java/org/apache/vysper/mina/codec/ main/java/org/apache/vysper/xmpp/parser/ test/java/org/apache/vysper/mina/ test/java/org/apache/vysper/mina/codec/

Author: berndf
Date: Sat Oct  6 06:54:54 2007
New Revision: 582495

URL: http://svn.apache.org/viewvc?rev=582495&view=rev
Log:
[vysper] decoding xml stream at element boundaries

Added:
    labs/vysper/src/test/java/org/apache/vysper/mina/
    labs/vysper/src/test/java/org/apache/vysper/mina/codec/
    labs/vysper/src/test/java/org/apache/vysper/mina/codec/XMPPProtocolDecoderTestCase.java
Modified:
    labs/vysper/src/main/java/org/apache/vysper/mina/codec/XMPPProtocolDecoder.java
    labs/vysper/src/main/java/org/apache/vysper/xmpp/parser/AbstractNekopullStreamParser.java

Modified: labs/vysper/src/main/java/org/apache/vysper/mina/codec/XMPPProtocolDecoder.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/mina/codec/XMPPProtocolDecoder.java?rev=582495&r1=582494&r2=582495&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/mina/codec/XMPPProtocolDecoder.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/mina/codec/XMPPProtocolDecoder.java Sat Oct  6 06:54:54 2007
@@ -16,32 +16,100 @@
  ***********************************************************************/
 package org.apache.vysper.mina.codec;
 
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.apache.vysper.xmpp.parser.AbstractNekopullStreamParser;
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.xerces.xni.parser.XMLInputSource;
+import org.cyberneko.pull.parsers.Xerces2;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
 
 /**
  */
-public class XMPPProtocolDecoder implements ProtocolDecoder {
+public class XMPPProtocolDecoder extends CumulativeProtocolDecoder {
     
     public static final String ATTRIBUTE_VYSPER_XLM_PARSER = "vysperXMLParser";
 
-    public void decode(IoSession ioSession, ByteBuffer byteBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
+    public boolean doDecode(IoSession ioSession, ByteBuffer byteBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
 
-        Parser parser = (Parser) ioSession.getAttribute(ATTRIBUTE_VYSPER_XLM_PARSER);
-        if (parser == null) {
-            parser = new Parser();
-            ioSession.setAttribute(ATTRIBUTE_VYSPER_XLM_PARSER, parser);
-        }
+        ByteBuffer stanzaBuffer = collectFullStanza(byteBuffer);
+        if (stanzaBuffer == null) return false; // wait until stanza is completed
+
+//        Parser parser = (Parser) ioSession.getAttribute(ATTRIBUTE_VYSPER_XLM_PARSER);
+//        if (parser == null) {
+//            parser = new Parser(null);
+//            ioSession.setAttribute(ATTRIBUTE_VYSPER_XLM_PARSER, parser);
+//        }
+//        parser.copyLatest(stanzaBuffer);
+
+        // TODO do not use a fresh parser for every new decode, use session-persistent 
+        Parser parser = new Parser(stanzaBuffer);
+
+        Stanza nextStanza = null;
+        do {
+            nextStanza = parser.getNextStanza();
+            if (nextStanza != null) protocolDecoderOutput.write(nextStanza);
+        } while (nextStanza != null);
+        return true;
+    }
+
+    private ByteBuffer collectFullStanza(ByteBuffer byteBuffer) throws Exception {
+        int startPosition = byteBuffer.position();
         
-        parser.copyLatest(byteBuffer);
+        // skip all leading zeros
+        int leadingGarbage = 0;
+        while (byteBuffer.remaining() > 0) {
+            byte b = byteBuffer.get();
+            if (b == 0 || b == 32 || b == 13 || b == 10) {
+                leadingGarbage++;
+            } else break;
+        }
+        startPosition += leadingGarbage;
+        byteBuffer.position(startPosition);
+
+        String DEBUG_VORDERBAND = byteBuffer.duplicate().getString(Charset.forName("UTF-8").newDecoder());
 
-        Stanza nextStanza = parser.getNextStanza();
-        if (nextStanza != null) protocolDecoderOutput.write(nextStanza);
+        // count opening and closing braces
+        int opened = 0;
+        boolean openedOnce = false;
+        while (byteBuffer.remaining() > 0) {
+            char aChar = (char)byteBuffer.get();
+            if (aChar == '<') {
+                opened++;
+                openedOnce = true;
+            }  
+            if (aChar == '>') {
+                if (opened < 1) throw new Exception("mismatching closing brace '>'");
+                opened--;
+                
+                // now, if we have a complete stanza, bring it to an end
+                if (openedOnce && opened == 0) {
+                    int endPosition = byteBuffer.position();
+                    int limit = byteBuffer.limit();
+                    ByteBuffer stanzaBuffer = null;
+                    try {
+                        // prepare correct slicing
+                        byteBuffer.position(startPosition);
+                        byteBuffer.limit(endPosition);
+                        stanzaBuffer = byteBuffer.slice(); 
+                    } finally {
+                        // cut off sliced parts
+                        byteBuffer.position(endPosition);
+                        byteBuffer.limit(limit);
+                    }
+                    String DEBUG_HINTERBAND = stanzaBuffer.duplicate().getString(Charset.forName("UTF-8").newDecoder());
+                    String DEBUG_REMAINS = byteBuffer.duplicate().getString(Charset.forName("UTF-8").newDecoder());
+                    return stanzaBuffer;
+                }
+            } 
+        }
+        // no complete stanza found. prepare next read
+        byteBuffer.position(startPosition);
+        return null;
     }
 
     public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
@@ -51,24 +119,28 @@
     public void dispose(IoSession ioSession) throws Exception {
 
     }
-
-    // this is not a lightweight implementation!! should be replaced by something more seamless
+    
     class Parser extends AbstractNekopullStreamParser {
 
+        protected Xerces2 createParser() {
+            return new Xerces2();
+        }
+
         private ByteBuffer permanentByteBuffer;
 
-        public Parser() {
-            this.permanentByteBuffer = ByteBuffer.allocate(16).setAutoExpand(true);
+        public Parser(ByteBuffer byteBuffer) {
+            if (byteBuffer == null) this.permanentByteBuffer = ByteBuffer.allocate(16, false).setAutoExpand(true);
+            else this.permanentByteBuffer = byteBuffer;
         }
 
         protected XMLInputSource getInputSource() {
-            return new XMLInputSource(null, null, null, permanentByteBuffer.asInputStream(), null);
+            InputStream inputStream = permanentByteBuffer.asInputStream();
+            return new XMLInputSource(null, null, null, inputStream, null);
         }
 
         public void copyLatest(ByteBuffer byteBuffer) {
             permanentByteBuffer.put(byteBuffer);
+            permanentByteBuffer.flip();
         }
     }
-    
-    
 }

Modified: labs/vysper/src/main/java/org/apache/vysper/xmpp/parser/AbstractNekopullStreamParser.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/main/java/org/apache/vysper/xmpp/parser/AbstractNekopullStreamParser.java?rev=582495&r1=582494&r2=582495&view=diff
==============================================================================
--- labs/vysper/src/main/java/org/apache/vysper/xmpp/parser/AbstractNekopullStreamParser.java (original)
+++ labs/vysper/src/main/java/org/apache/vysper/xmpp/parser/AbstractNekopullStreamParser.java Sat Oct  6 06:54:54 2007
@@ -69,7 +69,7 @@
      */
     private void open() throws ParsingException {
         if (parser != null) throw new RuntimeException("cannot reopen stream");
-        XMLPullParser parserTemp = new Xerces2();
+        XMLPullParser parserTemp = createParser();
         try {
             parserTemp.setInputSource(getInputSource());
         } catch (IOException e) {
@@ -81,6 +81,10 @@
         if (event.type != XMLEvent.DOCUMENT) throw new ParsingException("XML document event expected, but was type = " + event.type);
     }
 
+    protected Xerces2 createParser() {
+        return new Xerces2();
+    }
+
     /**
      * blocking operation until next stanza is ready
      * @return
@@ -122,7 +126,9 @@
 
         List<XMLFragment> xmlFragments = new ArrayList<XMLFragment>();
 
-        if (!fillDeep(startElementEvent.element.rawname, xmlFragments)) throw new ParsingException("XML end element not found as expected");
+        if (!"stream".equals(name)) {
+            if (!fillDeep(startElementEvent.element.rawname, xmlFragments)) throw new ParsingException("XML end element not found as expected");
+        }
 
         return new Stanza(name, namespaceURI, stanzaAttributes, xmlFragments);
     }
@@ -182,14 +188,14 @@
     }
 
     private XMLEvent getNextXMLEvent() throws ParsingException {
-        XMLEvent event;
+        XMLEvent event = null;
         try {
             event = parser.nextEvent();
         } catch (IOException e) {
             throw new ParsingException("could not step to next XML element", e);
         } catch (XMLParseException e) {
             if (e.getMessage().contains("XML document structures must start and end within the same entity")) return null; // end of file
-            throw new ParsingException("could not step to next XML element", e);
+            //throw new ParsingException("could not step to next XML element", e);
         }
         return event;
     }

Added: labs/vysper/src/test/java/org/apache/vysper/mina/codec/XMPPProtocolDecoderTestCase.java
URL: http://svn.apache.org/viewvc/labs/vysper/src/test/java/org/apache/vysper/mina/codec/XMPPProtocolDecoderTestCase.java?rev=582495&view=auto
==============================================================================
--- labs/vysper/src/test/java/org/apache/vysper/mina/codec/XMPPProtocolDecoderTestCase.java (added)
+++ labs/vysper/src/test/java/org/apache/vysper/mina/codec/XMPPProtocolDecoderTestCase.java Sat Oct  6 06:54:54 2007
@@ -0,0 +1,191 @@
+package org.apache.vysper.mina.codec;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ */
+public class XMPPProtocolDecoderTestCase extends TestCase {
+    private static final CharsetEncoder CHARSET_ENCODER_UTF8 = Charset.forName("UTF-8").newEncoder();
+
+    public void testDecoderSimple() throws Exception {
+        XMPPProtocolDecoder decoder = new XMPPProtocolDecoder();
+        MockIoSession session = new MockIoSession();
+        ByteBuffer firstByteBuffer = createByteBuffer();
+        ByteBuffer secondByteBuffer = createByteBuffer();
+        
+        String stanza = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n\r" + 
+                "<stream:stream to='example.com' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams' version='1.0'>" +
+                "<trailing-stanza/>";
+
+        firstByteBuffer.putString(stanza, CHARSET_ENCODER_UTF8).flip();
+        
+        secondByteBuffer.putString("<next></next>", CHARSET_ENCODER_UTF8).flip();
+        
+        MockProtocolDecoderOutput protocolDecoderOutput = new MockProtocolDecoderOutput();
+        decoder.decode(session, firstByteBuffer, protocolDecoderOutput);
+        assertEquals(2, protocolDecoderOutput.size());
+        
+        decoder.decode(session, secondByteBuffer, protocolDecoderOutput);
+        assertEquals(3, protocolDecoderOutput.size());
+
+        ByteBuffer emptyBuffer = createByteBuffer().putString("eee", CHARSET_ENCODER_UTF8).flip();
+        decoder.decode(session, emptyBuffer, protocolDecoderOutput);
+        assertEquals(3, protocolDecoderOutput.size());
+        
+    }
+
+    public void testDecoderPartial() throws Exception {
+        XMPPProtocolDecoder decoder = new XMPPProtocolDecoder();
+        MockIoSession session = new MockIoSession();
+        ByteBuffer firstByteBuffer = createByteBuffer();
+        ByteBuffer secondByteBuffer = createByteBuffer();
+        
+        String stanzaPart1 = "<stream:stream to='example.com' xmlns='jabber:client' xmlns:stream='ht";
+        String stanzaPart2 = "tp://etherx.jabber.org/streams' version='1.0'>";
+
+        MockProtocolDecoderOutput protocolDecoderOutput = new MockProtocolDecoderOutput();
+
+        ByteBuffer prolog = createByteBuffer();
+        prolog.putString("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n\r", CHARSET_ENCODER_UTF8).flip();
+        decoder.decode(session, prolog, protocolDecoderOutput);
+        
+        firstByteBuffer.putString(stanzaPart1, CHARSET_ENCODER_UTF8).flip();
+        decoder.decode(session, firstByteBuffer, protocolDecoderOutput);
+        assertEquals(0, protocolDecoderOutput.size());
+        
+        secondByteBuffer.putString(stanzaPart2, CHARSET_ENCODER_UTF8).flip();
+        decoder.decode(session, secondByteBuffer, protocolDecoderOutput);
+        assertEquals(1, protocolDecoderOutput.size());
+    }
+
+    private ByteBuffer createByteBuffer() {
+        return ByteBuffer.allocate(0, false).setAutoExpand(true).clear();
+    }
+
+    private class MockIoSession extends BaseIoSession
+    {
+
+        protected void updateTrafficMask()
+        {
+        }
+
+        public IoSessionConfig getConfig()
+        {
+            return null;
+        }
+
+        public IoFilterChain getFilterChain()
+        {
+            return null;
+        }
+
+        public IoHandler getHandler()
+        {
+            return null;
+        }
+
+        public SocketAddress getLocalAddress()
+        {
+            return null;
+        }
+
+        public SocketAddress getRemoteAddress()
+        {
+            return null;
+        }
+
+        public int getScheduledWriteBytes()
+        {
+            return 0;
+        }
+
+        public int getScheduledWriteRequests()
+        {
+            return 0;
+        }
+
+        public IoService getService()
+        {
+            return null;
+        }
+
+        public SocketAddress getServiceAddress()
+        {
+            return null;
+        }
+
+        public IoServiceConfig getServiceConfig()
+        {
+            return null;
+        }
+
+        public TransportType getTransportType()
+        {
+            return null;
+        }
+
+        public Object getAttribute(String s) {
+            return super.getAttribute(s);
+        }
+
+        public Object setAttribute(String s, Object o) {
+            return super.setAttribute(s, o);
+        }
+
+        public Object setAttribute(String s) {
+            return super.setAttribute(s);
+        }
+
+        public Object removeAttribute(String s) {
+            return super.removeAttribute(s);
+        }
+
+        public boolean containsAttribute(String s) {
+            return super.containsAttribute(s);
+        }
+
+        public Set<String> getAttributeKeys() {
+            return super.getAttributeKeys();
+        }
+    }
+    
+    private static class MockProtocolDecoderOutput implements ProtocolDecoderOutput
+    {
+        private List<Object> result = new ArrayList<Object>();
+
+        public void flush()
+        {
+        }
+
+        public void write( Object message )
+        {
+            result.add( message );
+        }
+        
+        public Iterator<Object> iterator() {
+            return result.iterator();
+        }
+
+        public int size() {
+            return result.size();
+        }
+    }
+    
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org