You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2015/08/14 21:07:49 UTC
[16/50] [abbrv] qpid-proton git commit: Adding some experimental
changes.
Adding some experimental changes.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/de8667c2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/de8667c2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/de8667c2
Branch: refs/heads/rajith-codec
Commit: de8667c2f1acf937bd775f263d499797da4c64ec
Parents: a7f13b7
Author: Rajith Attapattu <ra...@apache.org>
Authored: Thu Mar 19 19:34:08 2015 -0400
Committer: Rajith Attapattu <ra...@apache.org>
Committed: Thu Jul 9 09:12:38 2015 -0400
----------------------------------------------------------------------
.../qpid/proton/engine/impl/FrameWriter2.java | 235 +++++++++++++++++++
.../qpid/proton/engine/impl/TransportImpl.java | 25 +-
2 files changed, 249 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/de8667c2/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
new file mode 100644
index 0000000..109fecf
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter2.java
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.transport.FrameBody;
+import org.apache.qpid.proton.codec2.ByteArrayEncoder;
+import org.apache.qpid.proton.codec2.CodecHelper;
+import org.apache.qpid.proton.codec2.Type;
+import org.apache.qpid.proton.framing.TransportFrame;
+
+/**
+ * FrameWriter2
+ *
+ * Copied FrameWrite.java and modified just enough to plug in the new codec.
+ *
+ */
+
+class FrameWriter2
+{
+
+ static final byte AMQP_FRAME_TYPE = 0;
+
+ static final byte SASL_FRAME_TYPE = (byte) 1;
+
+ static final int DEFUALT_BUFFER_SIZE = Integer.getInteger("proton.encoder.buffer_size", 1024);
+
+ private ByteArrayEncoder _encoder;
+
+ private byte[] _buffer = new byte[DEFUALT_BUFFER_SIZE];
+
+ private int _maxFrameSize;
+
+ private byte _frameType;
+
+ final private Ref<ProtocolTracer> _protocolTracer;
+
+ private TransportImpl _transport;
+
+ private int _frameStart = 0;
+
+ private int _payloadStart;
+
+ private int _performativeSize;
+
+ private int _position = 0;
+
+ private int _read = 0;
+
+ FrameWriter2(ByteArrayEncoder encoder, int maxFrameSize, byte frameType, Ref<ProtocolTracer> protocolTracer,
+ TransportImpl transport)
+ {
+ _encoder = encoder;
+ _encoder.init(_buffer, 0, _buffer.length);
+ _maxFrameSize = maxFrameSize;
+ _frameType = frameType;
+ _protocolTracer = protocolTracer;
+ _transport = transport;
+ }
+
+ void setMaxFrameSize(int maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
+
+ private void grow()
+ {
+ byte[] old = _buffer;
+ _buffer = new byte[_buffer.length * 2];
+ int pos = _position;
+ System.arraycopy(old, 0, _buffer, 0, pos);
+
+ _encoder.init(_buffer, pos, _buffer.length * 2);
+ }
+
+ void writeHeader(byte[] header)
+ {
+ System.arraycopy(header, 0, _buffer, 0, header.length);
+ _position = header.length;
+ }
+
+ private void startFrame()
+ {
+ _frameStart = _position;
+ }
+
+ private void writePerformative(Object frameBody)
+ {
+ while (_buffer.length - _position < 8)
+ {
+ grow();
+ }
+
+ while (true)
+ {
+ try
+ {
+ _encoder.setPosition(_frameStart + 8);
+ CodecHelper.encodeObject(_encoder,frameBody);
+ _position = _encoder.getPosition();
+ break;
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ grow();
+ }
+ }
+
+ _payloadStart = _position;
+ _performativeSize = _payloadStart - _frameStart;
+ }
+
+ private void endFrame(int channel)
+ {
+ int frameSize = _position - _frameStart;
+ _encoder.setPosition(_frameStart);
+ _encoder.putInt(frameSize);
+ _encoder.putByte((byte) 2);
+ _encoder.putByte(_frameType);
+ _encoder.putShort((short) channel);
+ }
+
+ void writeFrame(int channel, Object frameBody, ByteBuffer payload, Runnable onPayloadTooLarge)
+ {
+ startFrame();
+
+ writePerformative(frameBody);
+
+ if (_maxFrameSize > 0 && payload != null && (payload.remaining() + _performativeSize) > _maxFrameSize)
+ {
+ if (onPayloadTooLarge != null)
+ {
+ onPayloadTooLarge.run();
+ }
+ writePerformative(frameBody);
+ }
+
+ ByteBuffer originalPayload = null;
+ if (payload != null)
+ {
+ originalPayload = payload.duplicate();
+ }
+
+ // XXX: this is a bit of a hack but it eliminates duplicate
+ // code, further refactor will fix this
+ /*if (_frameType == AMQP_FRAME_TYPE)
+ {
+ TransportFrame frame = new TransportFrame(channel, (FrameBody) frameBody, Binary.create(originalPayload));
+ _transport.log(TransportImpl.OUTGOING, frame);
+
+ ProtocolTracer tracer = _protocolTracer.get();
+ if (tracer != null)
+ {
+ tracer.sentFrame(frame);
+ }
+ }*/
+
+ int capacity;
+ if (_maxFrameSize > 0)
+ {
+ capacity = _maxFrameSize - _performativeSize;
+ }
+ else
+ {
+ capacity = Integer.MAX_VALUE;
+ }
+ int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
+
+ if (payloadSize > 0)
+ {
+ while (_buffer.length - _position < payloadSize)
+ {
+ grow();
+ }
+
+ payload.get(_buffer, _position, payloadSize);
+ _position = _position + payloadSize + 1;
+ }
+ endFrame(channel);
+ }
+
+ void writeFrame(Object frameBody)
+ {
+ writeFrame(0, frameBody, null, null);
+ }
+
+ boolean isFull()
+ {
+ // XXX: this should probably be tunable
+ return _position > 64 * 1024;
+ }
+
+ int readBytes(ByteBuffer dst)
+ {
+ int size = Math.min(_position - _read, dst.remaining());
+
+ dst.put(_buffer, _read, size);
+ _read = _position - (_read + size);
+
+ // If we have have copied everything we can go back to the beginning
+ // size > 0 ensures we go reset the buffer if we don't endup reading at
+ // all.
+ if (_read == 0 && size > 0)
+ {
+ _position = 0;
+ }
+ // System.out.println("RAW: \"" + new Binary(dst.array(),
+ // dst.arrayOffset(), dst.position()) + "\"");
+
+ return size;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/de8667c2/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 595afd6..7270666 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -44,6 +44,7 @@ import org.apache.qpid.proton.amqp.transport.Transfer;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec2.ByteArrayEncoder;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
@@ -96,14 +97,15 @@ public class TransportImpl extends EndpointImpl
private TransportOutput _outputProcessor;
private DecoderImpl _decoder = new DecoderImpl();
- private EncoderImpl _encoder = new EncoderImpl(_decoder);
+ //private EncoderImpl _encoder = new EncoderImpl(_decoder);
+ private ByteArrayEncoder _encoder = new ByteArrayEncoder();
private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int _remoteMaxFrameSize = 512;
private int _channelMax = CHANNEL_MAX_LIMIT;
private int _remoteChannelMax = CHANNEL_MAX_LIMIT;
- private final FrameWriter _frameWriter;
+ private final FrameWriter2 _frameWriter;
private boolean _closeReceived;
private Open _open;
@@ -152,10 +154,10 @@ public class TransportImpl extends EndpointImpl
*/
TransportImpl(int maxFrameSize)
{
- AMQPDefinedTypes.registerAllTypes(_decoder, _encoder);
+ AMQPDefinedTypes.registerAllTypes(_decoder, new EncoderImpl(_decoder));
_maxFrameSize = maxFrameSize;
- _frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize,
+ _frameWriter = new FrameWriter2(_encoder, _remoteMaxFrameSize,
FrameWriter.AMQP_FRAME_TYPE,
_protocolTracer,
this);
@@ -791,23 +793,23 @@ public class TransportImpl extends EndpointImpl
(_connectionEndpoint != null &&
_connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)) &&
!_isOpenSent) {
- Open open = new Open();
+ org.apache.qpid.proton.transport.Open open = new org.apache.qpid.proton.transport.Open();
if (_connectionEndpoint != null) {
String cid = _connectionEndpoint.getLocalContainerId();
open.setContainerId(cid == null ? "" : cid);
open.setHostname(_connectionEndpoint.getHostname());
- open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
- open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
- open.setProperties(_connectionEndpoint.getProperties());
+ open.setDesiredCapabilities();
+ open.setOfferedCapabilities();
+ //open.setProperties(_connectionEndpoint.getProperties());
} else {
open.setContainerId("");
}
if (_maxFrameSize > 0) {
- open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
+ open.setMaxFrameSize(_maxFrameSize); //UnsignedInteger.valueOf(_maxFrameSize));
}
if (_channelMax > 0) {
- open.setChannelMax(UnsignedShort.valueOf((short) _channelMax));
+ open.setChannelMax(_channelMax); //UnsignedShort.valueOf((short) _channelMax));
}
// as per the recommendation in the spec, advertise half our
@@ -817,7 +819,8 @@ public class TransportImpl extends EndpointImpl
}
_isOpenSent = true;
- writeFrame(0, open, null, null);
+ //writeFrame(0, open, null, null);
+ _frameWriter.writeFrame(0, open, null, null);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org