You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2015/08/14 21:07:49 UTC

[16/50] [abbrv] qpid-proton git commit: Adding some experimental changes.

Adding some experimental changes.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/de8667c2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/de8667c2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/de8667c2

Branch: refs/heads/rajith-codec
Commit: de8667c2f1acf937bd775f263d499797da4c64ec
Parents: a7f13b7
Author: Rajith Attapattu <ra...@apache.org>
Authored: Thu Mar 19 19:34:08 2015 -0400
Committer: Rajith Attapattu <ra...@apache.org>
Committed: Thu Jul 9 09:12:38 2015 -0400

----------------------------------------------------------------------
 .../qpid/proton/engine/impl/FrameWriter2.java   | 235 +++++++++++++++++++
 .../qpid/proton/engine/impl/TransportImpl.java  |  25 +-
 2 files changed, 249 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/de8667c2/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
new file mode 100644
index 0000000..109fecf
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.transport.FrameBody;
+import org.apache.qpid.proton.codec2.ByteArrayEncoder;
+import org.apache.qpid.proton.codec2.CodecHelper;
+import org.apache.qpid.proton.codec2.Type;
+import org.apache.qpid.proton.framing.TransportFrame;
+
+/**
+ * FrameWriter2
+ * 
+ * Copied FrameWrite.java and modified just enough to plug in the new codec.
+ * 
+ */
+
+class FrameWriter2
+{
+
+    static final byte AMQP_FRAME_TYPE = 0;
+
+    static final byte SASL_FRAME_TYPE = (byte) 1;
+
+    static final int DEFUALT_BUFFER_SIZE = Integer.getInteger("proton.encoder.buffer_size", 1024);
+
+    private ByteArrayEncoder _encoder;
+
+    private byte[] _buffer = new byte[DEFUALT_BUFFER_SIZE];
+
+    private int _maxFrameSize;
+
+    private byte _frameType;
+
+    final private Ref<ProtocolTracer> _protocolTracer;
+
+    private TransportImpl _transport;
+
+    private int _frameStart = 0;
+
+    private int _payloadStart;
+
+    private int _performativeSize;
+
+    private int _position = 0;
+
+    private int _read = 0;
+
+    FrameWriter2(ByteArrayEncoder encoder, int maxFrameSize, byte frameType, Ref<ProtocolTracer> protocolTracer,
+            TransportImpl transport)
+    {
+        _encoder = encoder;
+        _encoder.init(_buffer, 0, _buffer.length);
+        _maxFrameSize = maxFrameSize;
+        _frameType = frameType;
+        _protocolTracer = protocolTracer;
+        _transport = transport;
+    }
+
+    void setMaxFrameSize(int maxFrameSize)
+    {
+        _maxFrameSize = maxFrameSize;
+    }
+
+    private void grow()
+    {
+        byte[] old = _buffer;
+        _buffer = new byte[_buffer.length * 2];
+        int pos = _position;
+        System.arraycopy(old, 0, _buffer, 0, pos);
+
+        _encoder.init(_buffer, pos, _buffer.length * 2);
+    }
+
+    void writeHeader(byte[] header)
+    {
+        System.arraycopy(header, 0, _buffer, 0, header.length);
+        _position = header.length;
+    }
+
+    private void startFrame()
+    {
+        _frameStart = _position;
+    }
+
+    private void writePerformative(Object frameBody)
+    {
+        while (_buffer.length - _position < 8)
+        {
+            grow();
+        }
+
+        while (true)
+        {
+            try
+            {
+                _encoder.setPosition(_frameStart + 8);
+                CodecHelper.encodeObject(_encoder,frameBody);
+                _position = _encoder.getPosition();
+                break;
+            }
+            catch (IndexOutOfBoundsException e)
+            {
+                grow();
+            }
+        }
+
+        _payloadStart = _position;
+        _performativeSize = _payloadStart - _frameStart;
+    }
+
+    private void endFrame(int channel)
+    {
+        int frameSize = _position - _frameStart;
+        _encoder.setPosition(_frameStart);
+        _encoder.putInt(frameSize);
+        _encoder.putByte((byte) 2);
+        _encoder.putByte(_frameType);
+        _encoder.putShort((short) channel);
+    }
+
+    void writeFrame(int channel, Object frameBody, ByteBuffer payload, Runnable onPayloadTooLarge)
+    {
+        startFrame();
+
+        writePerformative(frameBody);
+
+        if (_maxFrameSize > 0 && payload != null && (payload.remaining() + _performativeSize) > _maxFrameSize)
+        {
+            if (onPayloadTooLarge != null)
+            {
+                onPayloadTooLarge.run();
+            }
+            writePerformative(frameBody);
+        }
+
+        ByteBuffer originalPayload = null;
+        if (payload != null)
+        {
+            originalPayload = payload.duplicate();
+        }
+
+        // XXX: this is a bit of a hack but it eliminates duplicate
+        // code, further refactor will fix this
+        /*if (_frameType == AMQP_FRAME_TYPE)
+        {
+            TransportFrame frame = new TransportFrame(channel, (FrameBody) frameBody, Binary.create(originalPayload));
+            _transport.log(TransportImpl.OUTGOING, frame);
+
+            ProtocolTracer tracer = _protocolTracer.get();
+            if (tracer != null)
+            {
+                tracer.sentFrame(frame);
+            }
+        }*/
+
+        int capacity;
+        if (_maxFrameSize > 0)
+        {
+            capacity = _maxFrameSize - _performativeSize;
+        }
+        else
+        {
+            capacity = Integer.MAX_VALUE;
+        }
+        int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
+
+        if (payloadSize > 0)
+        {
+            while (_buffer.length - _position < payloadSize)
+            {
+                grow();
+            }
+
+            payload.get(_buffer, _position, payloadSize);
+            _position = _position + payloadSize + 1;
+        }
+        endFrame(channel);
+    }
+
+    void writeFrame(Object frameBody)
+    {
+        writeFrame(0, frameBody, null, null);
+    }
+
+    boolean isFull()
+    {
+        // XXX: this should probably be tunable
+        return _position > 64 * 1024;
+    }
+
+    int readBytes(ByteBuffer dst)
+    {
+        int size = Math.min(_position - _read, dst.remaining());
+
+        dst.put(_buffer, _read, size);
+        _read = _position - (_read + size);
+
+        // If we have have copied everything we can go back to the beginning
+        // size > 0 ensures we go reset the buffer if we don't endup reading at
+        // all.
+        if (_read == 0 && size > 0)
+        {
+            _position = 0;
+        }
+        // System.out.println("RAW: \"" + new Binary(dst.array(),
+        // dst.arrayOffset(), dst.position()) + "\"");
+
+        return size;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/de8667c2/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 595afd6..7270666 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -44,6 +44,7 @@ import org.apache.qpid.proton.amqp.transport.Transfer;
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec2.ByteArrayEncoder;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
@@ -96,14 +97,15 @@ public class TransportImpl extends EndpointImpl
     private TransportOutput _outputProcessor;
 
     private DecoderImpl _decoder = new DecoderImpl();
-    private EncoderImpl _encoder = new EncoderImpl(_decoder);
+    //private EncoderImpl _encoder = new EncoderImpl(_decoder);
+    private ByteArrayEncoder _encoder = new ByteArrayEncoder();
 
     private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int _remoteMaxFrameSize = 512;
     private int _channelMax       = CHANNEL_MAX_LIMIT;
     private int _remoteChannelMax = CHANNEL_MAX_LIMIT;
 
-    private final FrameWriter _frameWriter;
+    private final FrameWriter2 _frameWriter;
 
     private boolean _closeReceived;
     private Open _open;
@@ -152,10 +154,10 @@ public class TransportImpl extends EndpointImpl
      */
     TransportImpl(int maxFrameSize)
     {
-        AMQPDefinedTypes.registerAllTypes(_decoder, _encoder);
+        AMQPDefinedTypes.registerAllTypes(_decoder, new EncoderImpl(_decoder));
 
         _maxFrameSize = maxFrameSize;
-        _frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize,
+        _frameWriter = new FrameWriter2(_encoder, _remoteMaxFrameSize,
                                        FrameWriter.AMQP_FRAME_TYPE,
                                        _protocolTracer,
                                        this);
@@ -791,23 +793,23 @@ public class TransportImpl extends EndpointImpl
              (_connectionEndpoint != null &&
               _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)) &&
             !_isOpenSent) {
-            Open open = new Open();
+            org.apache.qpid.proton.transport.Open open = new org.apache.qpid.proton.transport.Open();
             if (_connectionEndpoint != null) {
                 String cid = _connectionEndpoint.getLocalContainerId();
                 open.setContainerId(cid == null ? "" : cid);
                 open.setHostname(_connectionEndpoint.getHostname());
-                open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
-                open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
-                open.setProperties(_connectionEndpoint.getProperties());
+                open.setDesiredCapabilities();
+                open.setOfferedCapabilities();
+                //open.setProperties(_connectionEndpoint.getProperties());
             } else {
                 open.setContainerId("");
             }
 
             if (_maxFrameSize > 0) {
-                open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
+                open.setMaxFrameSize(_maxFrameSize); //UnsignedInteger.valueOf(_maxFrameSize));
             }
             if (_channelMax > 0) {
-                open.setChannelMax(UnsignedShort.valueOf((short) _channelMax));
+                open.setChannelMax(_channelMax); //UnsignedShort.valueOf((short) _channelMax));
             }
 
             // as per the recommendation in the spec, advertise half our
@@ -817,7 +819,8 @@ public class TransportImpl extends EndpointImpl
             }
             _isOpenSent = true;
 
-            writeFrame(0, open, null, null);
+            //writeFrame(0, open, null, null);
+            _frameWriter.writeFrame(0, open, null, null);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org