You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by de...@apache.org on 2013/11/08 09:16:06 UTC
[3/9] Remote Desktop Protocol(RDP) client that is suitable for
including in the console VM. The client renders RDP to a window created by
Java. It is important for Hyper-V support,
because Hyper-V provides access to the consoles of VMs that it is running
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java
new file mode 100644
index 0000000..832c731
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java
@@ -0,0 +1,826 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class represents a slice in a buffer.
+ */
+public class ByteBuffer {
+
+ public static final String SEQUENCE_NUMBER = "seq";
+
+ public byte data[];
+ public int offset = 0;
+ public int length = 0;
+ public int cursor = 0;
+
+ private int refCount = 1;
+ private ByteBuffer parentByteBuffer = null;
+
+ private Order order;
+
+ /**
+ * Create buffer of size no less than length. Buffer can be a bit larger than
+ * length. Offset also can be set to non-zero value to leave some place for
+ * future headers.
+ */
+ public ByteBuffer(int minLength) {
+ // Get buffer of acceptable size from buffer pool
+ this.data = BufferPool.allocateNewBuffer(minLength);
+ this.offset = 0;
+ this.length = minLength;
+ }
+
+ public ByteBuffer(byte data[]) {
+ if (data == null)
+ throw new NullPointerException("Data must be non-null.");
+
+ this.data = data;
+ this.offset = 0;
+ this.length = data.length;
+ }
+
+ public ByteBuffer(byte[] data, int offset, int length) {
+ if (data == null)
+ throw new NullPointerException("Data must be non-null.");
+
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ /**
+ * Create byte buffer of requested size with some space reserved for future
+ * headers.
+ */
+ public ByteBuffer(int minLength, boolean reserveSpaceForHeader) {
+ // Get buffer of acceptable size from buffer pool
+ this.data = BufferPool.allocateNewBuffer(128 + minLength);
+ this.offset = 128; // 100 bytes should be enough for headers
+ this.length = minLength;
+ }
+
+ /**
+ * Create empty buffer with given order only.
+ */
+ public ByteBuffer(Order order) {
+ this.order = order;
+ }
+
+ public void setOrder(Order order) {
+ this.order = order;
+ }
+
+ public Order getOrder() {
+ return order;
+ }
+
+ @Override
+ public String toString() {
+ return toString(100);
+ }
+
+ /**
+ * Return string representation of this byte buffer.
+ *
+ * @param maxLength
+ * number of bytes to show in string
+ */
+ public String toString(int maxLength) {
+ return "ByteRange(){offset=" + offset + ", length=" + length + ", cursor=" + cursor + ", data=" + ((data == null) ? "null" : toHexString(maxLength))
+ + ((metadata == null || metadata.size() == 0) ? "" : ", metadata=" + metadata) + "}";
+ }
+
+ /**
+ * Return string representation of this byte buffer as hexadecimal numbers,
+ * e.g. "[0x01, 0x02]".
+ *
+ * @param maxLength
+ * number of bytes to show in string
+ */
+ public String toHexString(int maxLength) {
+ StringBuilder builder = new StringBuilder(maxLength * 6);
+ builder.append('[');
+ for (int i = 0; i < maxLength && i < length; i++) {
+ if (i > 0)
+ builder.append(", ");
+ int b = data[offset + i] & 0xff;
+ builder.append("0x" + ((b < 16) ? "0" : "") + Integer.toString(b, 16));
+ }
+ builder.append(']');
+ return builder.toString();
+ }
+
+ /**
+ * Return string representation of this byte buffer as hexadecimal numbers,
+ * e.g. "01 02".
+ *
+ * @param maxLength
+ * number of bytes to show in string
+ */
+ public String toPlainHexString(int maxLength) {
+ StringBuilder builder = new StringBuilder(maxLength * 3);
+ for (int i = 0; i < maxLength && i < length; i++) {
+ if (i > 0)
+ builder.append(" ");
+ int b = data[offset + i] & 0xff;
+ builder.append(((b < 16) ? "0" : "") + Integer.toString(b, 16));
+ }
+ return builder.toString();
+ }
+
+ public void dump() {
+ System.out.println(toString(length));
+ }
+
+ public void extend(int newLength) {
+ if (data.length < newLength)
+ Arrays.copyOf(data, newLength);
+ }
+
+ public void ref() {
+ refCount++;
+ }
+
+ public void unref() {
+ refCount--;
+
+ if (refCount == 0) {
+
+ if (parentByteBuffer != null) {
+ parentByteBuffer.unref();
+ parentByteBuffer = null;
+ } else {
+ // Return buffer to buffer pool
+ BufferPool.recycleBuffer(data);
+ }
+
+ data = null;
+ }
+
+ }
+
+ public boolean isSoleOwner() {
+ return refCount == 1 && (parentByteBuffer == null);
+ }
+
+ /**
+ * Create shared lightweight copy of part of this buffer.
+ */
+ public ByteBuffer slice(int offset, int length, boolean copyMetadata) {
+ ref();
+
+ if (this.length < (offset + length))
+ throw new RuntimeException("Length of region is larger that length of this buffer. Buffer length: " + this.length + ", offset: " + offset
+ + ", new region length: " + length + ".");
+
+ ByteBuffer slice = new ByteBuffer(data, this.offset + offset, length);
+
+ if (copyMetadata && this.metadata != null)
+ slice.metadata = new HashMap<String, Object>(metadata);
+
+ return slice;
+ }
+
+ private Map<String, Object> metadata = null;
+
+ public Object putMetadata(String key, Object value) {
+ if (metadata == null)
+ metadata = new HashMap<String, Object>();
+ return metadata.put(key, value);
+ }
+
+ public Object getMetadata(String key) {
+ return (metadata != null) ? metadata.get(key) : null;
+ }
+
+ /**
+ * Create new buffer, which holds data from both buffers. Expensive operation.
+ *
+ * @TODO if only one reference to this ByteBuffer exists, then extend this
+ * buffer instead of creating new buffer
+ * @TODO support list of buffers to avoid expensive joins until absolute
+ * necessary
+ */
+ public ByteBuffer join(ByteBuffer buf) {
+ // Extend byte array for new data
+ int newLength = length + buf.length;
+ byte newData[] = new byte[newLength];
+
+ // Copy data from our buffer
+ System.arraycopy(data, offset, newData, 0, length);
+
+ // Copy data from other buffer
+ System.arraycopy(buf.data, buf.offset, newData, length, buf.length);
+
+ ByteBuffer newBuf = new ByteBuffer(newData);
+
+ // Copy our (older) metadata to new buffer, because handler might store some
+ // metadata in buffer, which is pushed back.
+ if (metadata != null)
+ newBuf.metadata = new HashMap<String, Object>(metadata);
+
+ return newBuf;
+ }
+
+ /**
+ * Copy used portion of buffer to new byte array. Expensive operation.
+ */
+ public byte[] toByteArray() {
+ return Arrays.copyOfRange(data, offset, offset + length);
+ }
+
+ public short[] toShortArray() {
+ if (length % 2 != 0)
+ throw new ArrayIndexOutOfBoundsException("Length of byte array must be dividable by 2 without remainder. Array length: " + length + ", remainder: "
+ + (length % 2) + ".");
+
+ short[] buf = new short[length / 2];
+
+ for (int i = 0, j = offset; i < buf.length; i++, j += 2) {
+ buf[i] = (short) ((data[j + 0] & 0xFF) | ((data[j + 1] & 0xFF) << 8));
+ }
+ return buf;
+ }
+
+ /**
+ * Return array of int's in little endian order.
+ */
+ public int[] toIntLEArray() {
+ if (length % 4 != 0)
+ throw new ArrayIndexOutOfBoundsException("Length of byte array must be dividable by 4 without remainder. Array length: " + length + ", remainder: "
+ + (length % 4) + ".");
+
+ int[] buf = new int[length / 4];
+
+ for (int i = 0, j = offset; i < buf.length; i++, j += 4) {
+ buf[i] = (data[j + 0] & 0xFF) | ((data[j + 1] & 0xFF) << 8) | ((data[j + 2] & 0xFF) << 16) | ((data[j + 3] & 0xFF) << 24);
+ }
+ return buf;
+ }
+
+ /**
+ * Return array of int's in little endian order, but use only 3 bytes per int (3RGB).
+ */
+ public int[] toInt3LEArray() {
+ if (length % 3 != 0)
+ throw new ArrayIndexOutOfBoundsException("Length of byte array must be dividable by 3 without remainder. Array length: " + length + ", remainder: "
+ + (length % 3) + ".");
+
+ int[] buf = new int[length / 3];
+
+ for (int i = 0, j = offset; i < buf.length; i++, j += 3) {
+ buf[i] = (data[j + 0] & 0xFF) | ((data[j + 1] & 0xFF) << 8) | ((data[j + 2] & 0xFF) << 16);
+ }
+ return buf;
+ }
+
+ /**
+ * Helper method for test cases to convert array of byte arrays to array of
+ * byte buffers.
+ */
+ public static ByteBuffer[] convertByteArraysToByteBuffers(byte[]... bas) {
+ ByteBuffer bufs[] = new ByteBuffer[bas.length];
+
+ int i = 0;
+ for (byte[] ba : bas) {
+ bufs[i++] = new ByteBuffer(ba);
+ }
+ return bufs;
+ }
+
+ /**
+ * Read signed int in network order. Cursor is advanced by 4.
+ */
+ public int readSignedInt() {
+ if (cursor + 4 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 4 bytes from this buffer: " + this + ".");
+
+ int result = (((data[offset + cursor] & 0xff) << 24) + ((data[offset + cursor + 1] & 0xff) << 16) + ((data[offset + cursor + 2] & 0xff) << 8) + (data[offset
+ + cursor + 3] & 0xff));
+ cursor += 4;
+ return result;
+ }
+
+ /**
+ * Read signed int in little endian order. Cursor is advanced by 4.
+ */
+ public int readSignedIntLE() {
+ if (cursor + 4 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 4 bytes from this buffer: " + this + ".");
+
+ int result = (((data[offset + cursor + 3] & 0xff) << 24) + ((data[offset + cursor + 2] & 0xff) << 16) + ((data[offset + cursor + 1] & 0xff) << 8) + (data[offset
+ + cursor] & 0xff));
+ cursor += 4;
+ return result;
+ }
+
+ /**
+ * Read unsigned int in little endian order. Cursor is advanced by 4.
+ */
+ public long readUnsignedIntLE() {
+ if (cursor + 4 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 4 bytes from this buffer: " + this + ".");
+
+ long result = (((long) (data[offset + cursor + 3] & 0xff) << 24) + ((long) (data[offset + cursor + 2] & 0xff) << 16)
+ + ((long) (data[offset + cursor + 1] & 0xff) << 8) + (long) (data[offset + cursor] & 0xff));
+ cursor += 4;
+ return result;
+ }
+
+ /**
+ * Read signed int in variable length format. Top most bit of each byte
+ * indicates that next byte contains additional bits. Cursor is advanced by
+ * 1-5 bytes.
+ */
+ public int readVariableSignedIntLE() {
+ int result = 0;
+
+ for (int shift = 0; shift < 32; shift += 7) {
+ int b = readUnsignedByte();
+ result |= (b & 0x7f) << shift;
+ if ((b & 0x80) == 0)
+ break;
+ }
+
+ return result;
+ }
+
+ /**
+ * Read unsigned int in network order in variable length format. Cursor is
+ * advanced by 1 to 4 bytes.
+ *
+ * Two most significant bits of first byte indicates length of field: 0x00 - 1
+ * byte, 0x40 - 2 bytes, 0x80 - 3 bytes, 0xc0 - 4 bytes.
+ *
+ * @see http://msdn.microsoft.com/en-us/library/cc241614.aspx
+ */
+ public int readEncodedUnsignedInt() {
+ int firstByte = readUnsignedByte();
+ int result;
+ switch (firstByte & 0xc0) {
+ default:
+ case 0x00:
+ result = firstByte & 0x3f;
+ break;
+ case 0x40:
+ result = (firstByte & 0x3f << 8) | readUnsignedByte();
+ break;
+ case 0x80:
+ result = (((firstByte & 0x3f << 8) | readUnsignedByte()) << 8) | readUnsignedByte();
+ break;
+ case 0xc0:
+ result = ((((firstByte & 0x3f << 8) | readUnsignedByte()) << 8) | readUnsignedByte() << 8) | readUnsignedByte();
+ break;
+ }
+
+ return result;
+ }
+
+ /**
+ * Read unsigned byte. Cursor is advanced by 1.
+ */
+ public int readUnsignedByte() {
+ if (cursor + 1 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 1 byte from this buffer: " + this + ".");
+
+ int b = data[offset + cursor] & 0xff;
+ cursor += 1;
+ return b;
+ }
+
+ /**
+ * Read signed byte. Cursor is advanced by 1.
+ */
+ public byte readSignedByte() {
+ if (cursor + 1 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 1 byte from this buffer: " + this + ".");
+
+ byte b = data[offset + cursor];
+ cursor += 1;
+ return b;
+ }
+
+ /**
+ * Read unsigned short in network order. Cursor is advanced by 2.
+ */
+ public int readUnsignedShort() {
+ if (cursor + 2 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 2 bytes from this buffer: " + this + ".");
+
+ int result = (((data[offset + cursor] & 0xff) << 8) | (data[offset + cursor + 1] & 0xff));
+ cursor += 2;
+ return result;
+ }
+
+ /**
+ * Read signed short in little endian order. Cursor is advanced by 2.
+ */
+ public short readSignedShortLE() {
+ if (cursor + 2 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 2 bytes from this buffer: " + this + ".");
+
+ short result = (short) (((data[offset + cursor + 1] & 0xff) << 8) | (data[offset + cursor] & 0xff));
+ cursor += 2;
+ return result;
+ }
+
+ /**
+ * Read unsigned short in network order in variable length format. Cursor is
+ * advanced by 1 or 2 bytes.
+ *
+ * Most significant bit of first byte indicates length of field: 0 - 1 byte, 1
+ * - 2 bytes.
+ */
+ public int readVariableUnsignedShort() {
+ int firstByte = readUnsignedByte();
+
+ int result;
+ if ((firstByte & 0x80) == 0)
+ result = firstByte & 0x7f;
+ else {
+ int secondByte = readUnsignedByte();
+ result = (((firstByte & 0x7f) << 8) | secondByte);
+ }
+
+ return result;
+ }
+
+ /**
+ * Read unsigned short in little endian order. Cursor is advanced by 2.
+ */
+ public int readUnsignedShortLE() {
+ if (cursor + 2 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read 2 bytes from this buffer: " + this + ".");
+
+ int result = (((data[offset + cursor + 1] & 0xff) << 8) | (data[offset + cursor] & 0xff));
+ cursor += 2;
+ return result;
+ }
+
+ /**
+ * Read unsigned short in network order in variable length format. Cursor is
+ * advanced by 1 or 2 bytes.
+ *
+ * Most significant bit of first byte indicates length of field: 0x00 - 1
+ * byte, 0x80 - 2 bytes.
+ *
+ * @see http://msdn.microsoft.com/en-us/library/cc241612.aspx
+ */
+ public int readEncodedUnsignedShort() {
+ int firstByte = readUnsignedByte();
+
+ int result;
+ if ((firstByte & 0x80) == 0)
+ result = firstByte & 0x7f;
+ else {
+ int secondByte = readUnsignedByte();
+ result = (((firstByte & 0x7f) << 8) | secondByte);
+ }
+
+ return result;
+ }
+
+ /**
+ * Read signed short in network order in variable length format. Cursor is
+ * advanced by 1 or 2 bytes.
+ *
+ * Most significant bit of first byte indicates length of field: 0x00 - 1
+ * byte, 0x80 - 2 bytes. Second most significant bit indicates is value
+ * positive or negative.
+ *
+ * @see http://msdn.microsoft.com/en-us/library/cc241613.aspx
+ */
+ public int readEncodedSignedShort() {
+ int firstByte = readUnsignedByte();
+
+ int result;
+ if ((firstByte & 0x80) == 0)
+ result = firstByte & 0x3f;
+ else {
+ int secondByte = readUnsignedByte();
+ result = (((firstByte & 0x3f) << 8) | secondByte);
+ }
+
+ if ((firstByte & 0x40) > 0)
+ return -result;
+ else
+ return result;
+ }
+
+ /**
+ * Read signed long in little endian order. Cursor is advanced by 8 bytes.
+ */
+ public long readSignedLongLE() {
+ return (((long) readSignedIntLE()) & 0xffFFffFFL) | (((long) readSignedIntLE()) << 32);
+ }
+
+ /**
+ * Read string from buffer. Cursor is advanced by string length.
+ */
+ public String readString(int length, Charset charset) {
+ if (cursor + length > this.length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read " + length + " bytes from this buffer: " + this + ".");
+
+ String string = new String(data, offset + cursor, length, charset);
+ cursor += length;
+ return string;
+ }
+
+ /**
+ * Get bytes as lightweight slice. Cursor is advanced by data length.
+ */
+ public ByteBuffer readBytes(int dataLength) {
+ if (cursor + dataLength > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read " + dataLength + " bytes from this buffer: " + this + ".");
+
+ ByteBuffer slice = slice(cursor, dataLength, false);
+ cursor += dataLength;
+ return slice;
+ }
+
+ /**
+ * Cursor is advanced by given number of bytes.
+ */
+ public void skipBytes(int numOfBytes) {
+ if (cursor + numOfBytes > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot read " + numOfBytes + " bytes from this buffer: " + this + ".");
+
+ cursor += numOfBytes;
+ }
+
+ /**
+ * Write byte. Cursor is advanced by 1.
+ */
+ public void writeByte(int b) {
+ if (cursor + 1 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot write 1 byte to this buffer: " + this + ".");
+
+ data[offset + cursor] = (byte) b;
+ cursor += 1;
+ }
+
+ /**
+ * Write short in network order. Cursor is advanced by 2.
+ */
+ public void writeShort(int x) {
+ if (cursor + 2 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot write 2 bytes to this buffer: " + this + ".");
+
+ data[offset + cursor] = (byte) (x >> 8);
+ data[offset + cursor + 1] = (byte) x;
+ cursor += 2;
+ }
+
+ /**
+ * Write short in little endian order. Cursor is advanced by 2.
+ */
+ public void writeShortLE(int x) {
+ if (cursor + 2 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot write 2 bytes to this buffer: " + this + ".");
+
+ data[offset + cursor + 1] = (byte) (x >> 8);
+ data[offset + cursor] = (byte) x;
+ cursor += 2;
+ }
+
+ /**
+ * Write int in network order. Cursor is advanced by 4.
+ */
+ public void writeInt(int i) {
+ if (cursor + 4 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot write 4 bytes to this buffer: " + this + ".");
+
+ data[offset + cursor] = (byte) (i >> 24);
+ data[offset + cursor + 1] = (byte) (i >> 16);
+ data[offset + cursor + 2] = (byte) (i >> 8);
+ data[offset + cursor + 3] = (byte) i;
+ cursor += 4;
+ }
+
+ public void writeIntLE(int i) {
+ if (cursor + 4 > length)
+ throw new ArrayIndexOutOfBoundsException("Cannot write 4 bytes to this buffer: " + this + ".");
+
+ data[offset + cursor] = (byte) i;
+ data[offset + cursor + 1] = (byte) (i >> 8);
+ data[offset + cursor + 2] = (byte) (i >> 16);
+ data[offset + cursor + 3] = (byte) (i >> 24);
+ cursor += 4;
+ }
+
+ /**
+ * Write int in variable length format. Cursor is advanced by number of bytes
+ * written (1-5).
+ *
+ * Topmost bit of each byte is set to 1 to indicate that next byte has data.
+ */
+ public void writeVariableIntLE(int i) {
+ while (i != 0) {
+ // Get lower bits of number
+ int b = i & 0x7f;
+ i >>= 7;
+
+ if (i > 0)
+ // Set topmost bit of byte to indicate that next byte(s) contains
+ // remainder bits
+ b |= 0x80;
+
+ writeByte(b);
+ }
+ }
+
+ /**
+ * Write short in variable length format. Cursor is advanced by number of
+ * bytes written (1-2).
+ *
+ * Topmost bit of first byte is set to 1 to indicate that next byte has data.
+ */
+ public void writeVariableShort(int length) {
+ if (length > 0x7f | length < 0)
+ writeShort(length | 0x8000);
+ else
+ writeByte(length);
+ }
+
+ /**
+ * Prepend given data to this byte buffer.
+ */
+ public void prepend(ByteBuffer buf) {
+ prepend(buf.data, buf.offset, buf.length);
+ }
+
+ /**
+ * Prepend given data to this byte buffer.
+ */
+ public void prepend(byte[] data) {
+ prepend(data, 0, data.length);
+ }
+
+ /**
+ * Prepend given data to this byte buffer.
+ */
+ public void prepend(byte[] data, int offset, int length) {
+ if (!isSoleOwner()) {
+ throw new RuntimeException("Create full copy of this byte buffer data for modification. refCount: " + refCount + ", parentByteBuffer: "
+ + parentByteBuffer + ".");
+ }
+
+ // If there is no enough space for header to prepend
+ if (!(this.offset >= length)) {
+ throw new RuntimeException("Reserve data to have enough space for header.");
+ }
+
+ // Copy header
+ System.arraycopy(data, offset, this.data, this.offset - length, length);
+
+ // Extend byte range to include header
+ this.offset -= length;
+ this.length += length;
+ this.cursor += length;
+ }
+
+ public void writeString(String str, Charset charset) {
+ writeBytes(str.getBytes(charset));
+ }
+
+ /**
+ * Write string of fixed size. When string is shorted, empty space is filled
+ * with zeros. When string is larger, it is truncated.
+ */
+ public void writeFixedString(int length, String str, Charset charset) {
+ byte[] bytes = str.getBytes(charset);
+ writeBytes(bytes, 0, Math.min(bytes.length, length));
+
+ for (int i = bytes.length; i < length; i++)
+ writeByte(0);
+ }
+
+ public void writeBytes(ByteBuffer buf) {
+ writeBytes(buf.data, buf.offset, buf.length);
+ }
+
+ public void writeBytes(byte[] bytes) {
+ writeBytes(bytes, 0, bytes.length);
+ }
+
+ public void writeBytes(byte[] bytes, int offset, int length) {
+ System.arraycopy(bytes, offset, this.data, this.offset + this.cursor, length);
+ cursor += length;
+ }
+
+ // /**
+ // * Write BER encoded definite long variant of the ASN.1 length field.
+ // */
+ // public void writeBerLength(int value) {
+ // int fieldLength;
+ // if (value > 0xFFffFF)
+ // fieldLength = 4;
+ // else if (value > 0xFFff)
+ // fieldLength = 3;
+ // else if (value > 0xFF)
+ // fieldLength = 2;
+ // else
+ // fieldLength = 1;
+ //
+ // if (cursor + fieldLength + 1 > length)
+ // throw new ArrayIndexOutOfBoundsException("Cannot write " + (fieldLength +
+ // 1) + " byte(s) to this buffer: " + this + ".");
+ //
+ // // Write length of length field itself
+ // writeByte(0x80 | fieldLength);
+ //
+ // switch (fieldLength) {
+ // case 4:
+ // data[offset + cursor++] = (byte) (value >> 24);
+ // case 3:
+ // data[offset + cursor++] = (byte) (value >> 16);
+ // case 2:
+ // data[offset + cursor++] = (byte) (value >> 8);
+ // case 1:
+ // data[offset + cursor++] = (byte) value;
+ // }
+ //
+ // }
+
+ /**
+ * Reduce length of buffer to cursor position.
+ */
+ public void trimAtCursor() {
+ length = cursor;
+ }
+
+ /**
+ * Rewind cursor to beginning of buffer.
+ */
+ public void rewindCursor() {
+ cursor = 0;
+ }
+
+ /**
+ * Read RGB color in LE order. Cursor is advanced by 3.
+ *
+ * @return color as int, with red in lowest octet.
+ */
+ public int readRGBColor() {
+ return readUnsignedByte() | (readUnsignedByte() << 8) | (readUnsignedByte() << 16);
+ }
+
+ public void assertThatBufferIsFullyRead() {
+ if (cursor != length)
+ throw new RuntimeException("Data in buffer is not read fully. Buf: " + this + ".");
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+
+ int end = offset + length;
+ for (int i = offset; i < end; i++)
+ result = 31 * result + data[i];
+
+ result = prime * result + length;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ ByteBuffer other = (ByteBuffer) obj;
+ if (length != other.length)
+ return false;
+
+ for (int i = 0; i < length; i++)
+ if (data[offset + i] != other.data[other.offset + i])
+ return false;
+
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java
new file mode 100644
index 0000000..e3de289
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public interface DataSink {
+
+ void sendData(ByteBuffer buf);
+
+ void sendEvent(Event event, Direction direction);
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java
new file mode 100644
index 0000000..152be2e
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public interface DataSource {
+
+ /**
+ * Get data from source.
+ *
+ * @param block
+ * if false, then return immediately when no data is available,
+ * otherwise wait for data
+ * @return new data or null, when no data is available
+ */
+ ByteBuffer pull(boolean block);
+
+ /**
+ * Hold data temporary to use at next pull or push.
+ *
+ * @param buf
+ * a data
+ */
+ void pushBack(ByteBuffer buf);
+
+ /**
+ * Hold data temporary to use at next pull. Don't return abything untill given
+ * amount of data will be read from source, because data will be pushed back
+ * anyway.
+ *
+ * @param buf
+ * a data
+ * @param lengthOfFullPacket
+ * length of full block of data to read from source
+ */
+ void pushBack(ByteBuffer buf, int lengthOfFullPacket);
+
+ /**
+ * Send event to pads.
+ *
+ * @param event
+ * a event
+ * @param direction
+ * pad direction
+ */
+ void sendEvent(Event event, Direction direction);
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java
new file mode 100644
index 0000000..c9dede8
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public enum Direction {
+ IN, OUT
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
new file mode 100644
index 0000000..c927dea
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.Set;
+
+/**
+ * Element is for processing of data. It has one or more contact pads, which can
+ * be wired with other elements using links.
+ */
+public interface Element {
+
+ /**
+ * Name of pad for standard input. Should be set in all elements except pure
+ * sinks.
+ */
+ public static final String STDIN = "stdin";
+
+ /**
+ * Name of pad for standard output. Should be set in all elements except pure
+ * sources.
+ */
+ public static final String STDOUT = "stdout";
+
+ /**
+ * Get link connected to given pad.
+ *
+ * @param padName
+ * Standard pads are "stdin" and "stdout".
+ */
+ Link getLink(String padName);
+
+ /**
+ * Get pads of this element.
+ */
+ Set<String> getPads(Direction direction);
+
+ /**
+ * Connect link to given pad.
+ *
+ * @param padName
+ * a pad name. Standard pads are "stdin" and "stdout".
+ */
+ void setLink(String padName, Link link, Direction direction);
+
+ /**
+ * Disconnect link from given pad.
+ *
+ * @param padName
+ * Standard pads are "stdin" and "stdout".
+ */
+ void dropLink(String padName);
+
+ /**
+ * Pull data from element and handle it. Element should ask one of it input
+ * pads for data, handle data and push result to it sink(s), if any.
+ *
+ * @param block
+ * block until data will be available, or do a slight delay at least,
+ * when data is not available
+ */
+ void poll(boolean block);
+
+ /**
+ * Handle incoming data.
+ *
+ * @param buf
+ * a data
+ * @param link
+ * TODO
+ */
+ void handleData(ByteBuffer buf, Link link);
+
+ /**
+ * Handle event.
+ *
+ * @param event
+ * an event
+ * @param direction
+ * if IN, then send event to input pads, when OUT, then send to
+ * output pads
+ */
+ void handleEvent(Event event, Direction direction);
+
+ /**
+ * Get element ID.
+ */
+ String getId();
+
+ /**
+ * Validate element: check is all required pads are connected.
+ */
+ void validate();
+
+ /**
+ * Drop link.
+ *
+ * @param link a link to drop
+ */
+ void dropLink(Link link);
+
+ /**
+ * Drop existing link and replace it by new link.
+ */
+ void replaceLink(Link existingLink, Link newLink);
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
new file mode 100644
index 0000000..5e1a389
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public enum Event {
+ STREAM_START,
+ STREAM_CLOSE,
+
+ /**
+ * Upgrade socket to SSL.
+ */
+ SOCKET_UPGRADE_TO_SSL,
+
+ /**
+ * Switch links to input mode.
+ */
+ LINK_SWITCH_TO_PULL_MODE
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
new file mode 100644
index 0000000..65fb29e
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class FakeSink extends BaseElement {
+
+ public FakeSink(String id) {
+ super(id);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+ if (buf == null)
+ return;
+
+ // Use packetNumber variable to count incoming packets
+ packetNumber++;
+
+ buf.unref();
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSink(" + id + ")";
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element sink = new FakeSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ byte[] data = new byte[] { 1, 2, 3 };
+ ByteBuffer buf = new ByteBuffer(data);
+ sink.setLink(STDIN, new SyncLink(), Direction.IN);
+ sink.getLink(STDIN).sendData(buf);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
new file mode 100644
index 0000000..4cf6503
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class FakeSource extends BaseElement {
+
+ /**
+ * Delay for null packets in poll method when blocking is requested, in
+ * milliseconds.
+ */
+ protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+ public FakeSource(String id) {
+ super(id);
+ }
+
+ @Override
+ public void poll(boolean block) {
+ if (numBuffers > 0 && packetNumber >= numBuffers) {
+ // Close stream when limit of packets is reached
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ // Prepare new packet
+ ByteBuffer buf = initializeData();
+
+ // Push it to output(s)
+ pushDataToAllOuts(buf);
+
+ // Make slight delay when blocking input was requested (to avoid
+ // consuming of 100% in parent loop)
+ if (block)
+ delay();
+
+ }
+
+ /**
+ * Make slight delay. Should be used when blocking input is requested in pull
+ * mode, but null packed was returned by input.
+ */
+ protected void delay() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * Initialize data.
+ */
+ public ByteBuffer initializeData() {
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ // Set first byte of package to it sequance number
+ buf.data[buf.offset] = (byte) (packetNumber % 128);
+
+ // Initialize rest of bytes with sequential values, which are
+ // corresponding with their position in byte buffer
+ for (int i = buf.offset + 1; i < buf.length; i++)
+ buf.data[i] = (byte) (i % 128);
+
+ buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+ buf.putMetadata("src", id);
+
+ return buf;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSource(" + id + ")";
+ }
+
+ public static void main(String args[]) {
+
+ Element fakeSource = new FakeSource("source 3/10/100") {
+ {
+ verbose = true;
+ this.incommingBufLength = 3;
+ this.numBuffers = 10;
+ this.delay = 100;
+ }
+ };
+
+ Element fakeSink = new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Element fakeSink2 = new FakeSink("sink2") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ fakeSource.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ Link link2 = new SyncLink();
+
+ fakeSource.setLink("out2", link2, Direction.OUT);
+ fakeSink2.setLink(STDIN, link2, Direction.IN);
+
+ link.run();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
new file mode 100644
index 0000000..b05637f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class InputStreamSource extends BaseElement {
+
+ protected InputStream is;
+ protected SocketWrapper socketWrapper;
+
+ public InputStreamSource(String id) {
+ super(id);
+ }
+
+ public InputStreamSource(String id, InputStream is) {
+ super(id);
+ this.is = is;
+ }
+
+ public InputStreamSource(String id, SocketWrapper socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case OUT:
+ super.setLink(padName, link, direction);
+
+ if (is == null) {
+ // Pause links until data stream will be ready
+ link.pause();
+ }
+ break;
+ case IN:
+ throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ public void setInputStream(InputStream is) {
+ this.is = is;
+
+ // Resume links
+ resumeLinks();
+ }
+
+ private void resumeLinks() {
+ for (DataSink sink : outputPads.values())
+ ((Link) sink).resume();
+ }
+
+ /**
+ * Read data from input stream.
+ */
+ @Override
+ public void poll(boolean block) {
+ try {
+ if (!block && is.available() == 0) {
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: No data in stream is available now, returning.");
+
+ return;
+ }
+
+ // Create buffer of recommended size and with default offset
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+ int actualLength = is.read(buf.data, buf.offset, buf.data.length - buf.offset);
+
+ if (actualLength < 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: End of stream.");
+
+ buf.unref();
+ closeStream();
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ if (actualLength == 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+ buf.unref();
+ return;
+ }
+
+ buf.length = actualLength;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+ pushDataToAllOuts(buf);
+
+ } catch (IOException e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InputStreamSource(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ InputStream is = new ByteArrayInputStream(new byte[] { 1, 2, 3 });
+
+ InputStreamSource source = new InputStreamSource("source") {
+ {
+ verbose = true;
+ }
+ };
+ Element fakeSink = new FakeSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ Link link = new SyncLink() {
+ {
+ verbose = true;
+ }
+ };
+
+ source.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ source.setInputStream(is);
+
+ link.sendEvent(Event.STREAM_START, Direction.OUT);
+ link.run();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
new file mode 100644
index 0000000..bd970f0
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Link is wire between two elements. It always must contain source and sink
+ * elements.
+ */
+public interface Link extends DataSource, DataSink, Runnable {
+
+ /**
+ * Wire this link with given sink.
+ *
+ * @param sink
+ * an Element
+ * @return same sink element, for chaining
+ */
+ Element setSink(Element sink);
+
+ /**
+ * Wire this link with given source.
+ *
+ * @param source
+ * an Element
+ * @return same source element, for chaining
+ */
+ Element setSource(Element source);
+
+ Element getSource();
+
+ Element getSink();
+
+ /**
+ * Hold all data in cache, don't pass data to sink until resumed.
+ */
+ void pause();
+
+ /**
+ * Resume transfer.
+ */
+ void resume();
+
+ /**
+ * Change mode of operation of this link from push mode to pull mode.
+ */
+ void setPullMode();
+
+ /**
+ * Drop this link.
+ */
+ void drop();
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
new file mode 100644
index 0000000..ce9fdf9
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.Arrays;
+
+/**
+ * Compare incoming packets with expected packets.
+ */
+public class MockSink extends BaseElement {
+
+ protected ByteBuffer bufs[] = null;
+
+ public MockSink(String id) {
+ super(id);
+ }
+
+ public MockSink(String id, ByteBuffer bufs[]) {
+ super(id);
+ this.bufs = bufs;
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+ if (buf == null)
+ return;
+
+ if (packetNumber >= bufs.length)
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
+ + ", unexpected buffer: " + buf + ".");
+
+ // Compare incoming buffer with expected buffer
+ if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n Actual bufer: " + buf
+ + ",\n expected buffer: " + bufs[packetNumber] + ".");
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: buffers are equal.");
+
+ // Use packetNumber variable to count incoming packets
+ packetNumber++;
+
+ buf.unref();
+ }
+
+ @Override
+ protected void onClose() {
+ super.onClose();
+
+ if (packetNumber != bufs.length)
+ throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
+ }
+
+ @Override
+ public String toString() {
+ return "MockSink(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element mockSource = new MockSource("source") {
+ {
+ this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+ new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+ this.verbose = true;
+ this.delay = 100;
+ this.numBuffers = this.bufs.length;
+ }
+ };
+
+ Element mockSink = new MockSink("sink") {
+ {
+ this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+ new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink() {
+ {
+ this.verbose = true;
+ }
+ };
+
+ mockSource.setLink(STDOUT, link, Direction.OUT);
+ mockSink.setLink(STDIN, link, Direction.IN);
+
+ link.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
new file mode 100644
index 0000000..db47db2
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class MockSource extends FakeSource {
+
+ protected ByteBuffer bufs[] = null;
+
+ public MockSource(String id) {
+ super(id);
+ }
+
+ public MockSource(String id, ByteBuffer bufs[]) {
+ super(id);
+ this.bufs = bufs;
+ }
+
+ /**
+ * Initialize data.
+ */
+ @Override
+ public ByteBuffer initializeData() {
+ if (packetNumber >= bufs.length) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return null;
+ }
+
+ ByteBuffer buf = bufs[packetNumber];
+
+ buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+ return buf;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+ }
+
+ @Override
+ public String toString() {
+ return "MockSource(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element mockSource = new MockSource("source") {
+ {
+ this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+ new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+ this.verbose = true;
+ this.delay = 100;
+ // this.numBuffers = this.bufs.length;
+ }
+ };
+
+ Element fakeSink = new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ mockSource.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ link.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
new file mode 100644
index 0000000..a7d4848
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * One time switch for handshake and initialization stages.
+ *
+ * At beginning, element handles data internally, sending output to "otout" pad.
+ * After switchOff() method is called, element drops its links, so packets from
+ * "stdin" pad are forwarded directly to "stdout" pad, without processing.
+ *
+ * Event STREAM_START is captured by this element and not propagated further.
+ * When switchOff() method is called, event STREAM_START is generated and sent
+ * to "stdout".
+ */
+public abstract class OneTimeSwitch extends BaseElement {
+
+ /**
+ * One-time out - name of output pad for one time logic. By default, output
+ * directly to socket.
+ */
+ public static final String OTOUT = "otout";
+
+ private boolean switched = false;
+
+ public OneTimeSwitch(String id) {
+ super(id);
+ declarePads();
+ }
+
+ protected void declarePads() {
+ inputPads.put(STDIN, null);
+ outputPads.put(OTOUT, null);
+ outputPads.put(STDOUT, null);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (switched)
+ throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
+
+ if (buf == null)
+ return;
+
+ handleOneTimeData(buf, link);
+ }
+
+ public void pushDataToOTOut(ByteBuffer buf) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Sending data: " + buf + ".");
+
+ outputPads.get(OTOUT).sendData(buf);
+ }
+
+ /**
+ * Switch this element off. Pass data directly to main output(s).
+ */
+ public void switchOff() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching OFF.");
+
+ switched = true;
+ verbose = false;
+
+ // Rewire links: drop otout link, replace stdout link by stdin to send data
+ // directly to stdout
+ Link stdout = (Link) outputPads.get(STDOUT);
+ Link stdin = (Link) inputPads.get(STDIN);
+ Link otout = (Link) outputPads.get(OTOUT);
+
+ otout.drop();
+
+ // Wake up next peer(s)
+ sendEventToAllPads(Event.STREAM_START, Direction.OUT);
+
+ stdin.setSink(null);
+ inputPads.remove(STDIN);
+
+ Element nextPeer = stdout.getSink();
+ nextPeer.replaceLink(stdout, stdin);
+ stdout.drop();
+
+ for (Object link : inputPads.values().toArray())
+ ((Link) link).drop();
+ for (Object link : outputPads.values().toArray())
+ ((Link) link).drop();
+
+ }
+
+ public void switchOn() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching ON.");
+
+ switched = false;
+ }
+
+ /**
+ * Override this method to handle one-time packet(s) at handshake or
+ * initialization stages. Execute method @see switchRoute() when this method
+ * is no longer necessary.
+ */
+ protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (event == Event.STREAM_START) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+ switchOn();
+
+ // Execute this element onStart(), but do not propagate event further,
+ // to not wake up next elements too early
+ onStart();
+ } else
+ super.handleEvent(event, direction);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
new file mode 100644
index 0000000..1d63850
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class Order {
+
+ public Object type;
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
new file mode 100644
index 0000000..d1aa5ce
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class OutputStreamSink extends BaseElement {
+
+ protected OutputStream os;
+ protected SocketWrapper socketWrapper;
+
+ public OutputStreamSink(String id) {
+ super(id);
+ }
+
+ public OutputStreamSink(String id, OutputStream os) {
+ super(id);
+ this.os = os;
+ }
+
+ public OutputStreamSink(String id, SocketWrapper socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ public void setOutputStream(OutputStream os) {
+ this.os = os;
+ // Resume links
+ resumeLinks();
+ }
+
+ /**
+ * Send incoming data to stream.
+ */
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ try {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+ os.write(buf.data, buf.offset, buf.length);
+ os.flush();
+ } catch (IOException e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case IN:
+ super.setLink(padName, link, direction);
+
+ if (os == null)
+ // Pause links until data stream will be ready
+ link.pause();
+ break;
+ case OUT:
+ throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ private void resumeLinks() {
+ for (DataSource source : inputPads.values())
+ ((Link) source).resume();
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ os.close();
+ } catch (IOException e) {
+ }
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "OutputStreamSink(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ Element source = new FakeSource("source") {
+ {
+ this.verbose = true;
+ this.numBuffers = 3;
+ this.incommingBufLength = 5;
+ this.delay = 100;
+ }
+ };
+
+ OutputStreamSink sink = new OutputStreamSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ source.setLink(STDOUT, link, Direction.OUT);
+ sink.setLink(STDIN, link, Direction.IN);
+
+ sink.setOutputStream(new ByteArrayOutputStream());
+
+ link.sendEvent(Event.STREAM_START, Direction.IN);
+ link.run();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
new file mode 100644
index 0000000..c369350
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Pipeline groups multiple elements.
+ */
+public interface Pipeline extends Element {
+
+ static final String IN = Direction.IN.toString();
+ static final String OUT = Direction.OUT.toString();
+
+ /**
+ * Add elements to pipeline.
+ *
+ * @param elements
+ */
+ void add(Element... elements);
+
+ /**
+ * Add elements to pipeline and link them in given order.
+ *
+ * @param elements
+ */
+ void addAndLink(Element... elements);
+
+ /**
+ * Link elements in given order using SyncLink. Element name can have prefix
+ * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
+ * "stdin" and "stdout". I.e. <code>link("foo", "bar", "baz");</code> is equal
+ * to <code>link("foo >stdin", "stdout< bar >stdin", "stdout< baz");</code> .
+ *
+ * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
+ * so when pipeline will be connected with other elements, outside of this
+ * pipeline, they will be connected to IN and OUT elements.
+ *
+ * Example:
+ *
+ * <pre>
+ * pipeline.link("IN", "foo", "bar", "OUT");
+ * // Make additional branch from foo to baz, and then to OUT
+ * pipeline.link("foo >baz_out", "baz", "baz_in< OUT");
+ * </pre>
+ *
+ * @param elements
+ * elements to link
+ */
+ void link(String... elements);
+
+ /**
+ * Get element by name.
+ *
+ * @return an element
+ */
+ Element get(String elementName);
+
+ /**
+ * Get link by element name and pad name.
+ */
+ Link getLink(String elementName, String padName);
+
+ /**
+ * Set link by element name and pad name. Allows to link external elements
+ * into internal elements of pipeline. Special elements "IN" and "OUT" are
+ * pointing to pipeline outer interfaces.
+ */
+ void setLink(String elementName, String padName, Link link, Direction direction);
+
+ /**
+ * Get link connected to given pad in given element and run it main loop.
+ * @param separateThread
+ * set to true to start main loop in separate thread.
+ * @param waitForStartEvent TODO
+ */
+ void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
new file mode 100644
index 0000000..abf132f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PipelineImpl implements Pipeline {
+
+ protected String id;
+ protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
+
+ public PipelineImpl(String id) {
+ this.id = id;
+ elements = initElementMap(id);
+ }
+
+ protected Map<String, Element> elements;
+
+ protected HashMap<String, Element> initElementMap(String id) {
+ HashMap<String, Element> map = new HashMap<String, Element>();
+
+ map.put(IN, new BaseElement(id + "." + IN));
+ map.put(OUT, new BaseElement(id + "." + OUT));
+ return map;
+ }
+
+ @Override
+ public Link getLink(String padName) {
+ Link link = elements.get(IN).getLink(padName);
+ if (link == null)
+ link = elements.get(OUT).getLink(padName);
+ return link;
+ }
+
+ @Override
+ public Set<String> getPads(Direction direction) {
+ switch (direction) {
+ case IN:
+ return elements.get(IN).getPads(direction);
+
+ case OUT:
+ return elements.get(OUT).getPads(direction);
+ }
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ for (Element element : elements.values())
+ element.validate();
+
+ // Check IN element
+ {
+ Element element = get(IN);
+ int outPadsNumber = element.getPads(Direction.OUT).size();
+ int inPadsNumber = element.getPads(Direction.IN).size();
+ if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+ throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ }
+
+ // Check OUT element
+ {
+ Element element = get(OUT);
+ int outPadsNumber = element.getPads(Direction.OUT).size();
+ int inPadsNumber = element.getPads(Direction.IN).size();
+ if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+ throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ }
+
+ }
+
+ @Override
+ public void dropLink(String padName) {
+ if (elements.get(IN).getLink(padName) != null)
+ elements.get(IN).dropLink(padName);
+
+ if (elements.get(OUT).getLink(padName) != null)
+ elements.get(OUT).dropLink(padName);
+ }
+
+ @Override
+ public void dropLink(Link link) {
+ elements.get(IN).dropLink(link);
+ elements.get(OUT).dropLink(link);
+ }
+
+ @Override
+ public void replaceLink(Link existingLink, Link newLink) {
+ elements.get(IN).replaceLink(existingLink, newLink);
+ elements.get(OUT).replaceLink(existingLink, newLink);
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ // Wire links to internal elements instead
+ elements.get(direction.toString()).setLink(padName, link, direction);
+ }
+
+ @Override
+ public void poll(boolean block) {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ get(IN).handleData(buf, link);
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (direction) {
+ case IN:
+ get(IN).handleEvent(event, direction);
+ break;
+ case OUT:
+ get(OUT).handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public void add(Element... elements) {
+ for (Element element : elements) {
+ String id = element.getId();
+
+ if (this.elements.containsKey(id))
+ throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+ + this.elements.get(id) + ".");
+
+ this.elements.put(id, element);
+ }
+ }
+
+ @Override
+ public void link(String... elementNames) {
+
+ if (elementNames.length < 2)
+ throw new RuntimeException("At least two elements are necessary to create link between them.");
+
+ // Parse array of element and pad names
+
+ Element elements[] = new Element[elementNames.length];
+ String inputPads[] = new String[elementNames.length];
+ String outputPads[] = new String[elementNames.length];
+
+ int i = 0;
+ for (String elementName : elementNames) {
+ if (elementName.contains("< ")) {
+ inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
+ elementName = elementName.substring(elementName.indexOf("< ") + 2);
+ } else {
+ inputPads[i] = STDIN;
+ }
+
+ if (elementName.contains(" >")) {
+ outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
+ elementName = elementName.substring(0, elementName.indexOf(" >"));
+ } else {
+ outputPads[i] = STDOUT;
+ }
+
+ elements[i] = get(elementName);
+
+ if (elements[i] == null)
+ throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+ + this + ".");
+
+ i++;
+ }
+
+ // Link elements
+ for (i = 0; i < elements.length - 1; i++) {
+ Element leftElement = elements[i];
+ Element rightElement = elements[i + 1];
+ String leftPad = outputPads[i];
+ String rightPad = inputPads[i + 1];
+
+ String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+
+ Link link = new SyncLink(linkId);
+ leftElement.setLink(leftPad, link, Direction.OUT);
+ rightElement.setLink(rightPad, link, Direction.IN);
+ }
+ }
+
+ @Override
+ public void addAndLink(Element... elements) {
+ add(elements);
+ link(elements);
+ }
+
+ private void link(Element... elements) {
+ String elementNames[] = new String[elements.length];
+
+ int i = 0;
+ for (Element element : elements) {
+ elementNames[i++] = element.getId();
+ }
+
+ link(elementNames);
+ }
+
+ @Override
+ public Element get(String elementName) {
+ return elements.get(elementName);
+ }
+
+ @Override
+ public Link getLink(String elementName, String padName) {
+ return elements.get(elementName).getLink(padName);
+
+ }
+
+ @Override
+ public void setLink(String elementName, String padName, Link link, Direction direction) {
+ elements.get(elementName).setLink(padName, link, direction);
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
+ validate();
+
+ Link link = getLink(elementName, padName);
+
+ if (link == null)
+ throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
+
+ if (!waitForStartEvent)
+ link.sendEvent(Event.STREAM_START, Direction.OUT);
+
+ if (separateThread) {
+ Thread thread = new Thread(link);
+ thread.setDaemon(true);
+ thread.start();
+ } else {
+ link.run();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Pipeline(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ // System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("main");
+
+ // Create elements
+ pipeline.add(new FakeSource("source") {
+ {
+ this.incommingBufLength = 3;
+ this.numBuffers = 10;
+ this.delay = 100;
+ }
+ });
+ pipeline.add(new BaseElement("tee"));
+ pipeline.add(new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ });
+ pipeline.add(new FakeSink("sink2") {
+ {
+ this.verbose = true;
+ }
+ });
+
+ // Link elements
+ pipeline.link("source", "tee", "sink");
+ pipeline.link("tee >out2", "sink2");
+
+ // Run main loop
+ pipeline.runMainLoop("source", STDOUT, false, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
new file mode 100644
index 0000000..7a17340
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Message queue for safe transfer of packets between threads.
+ */
+public class Queue extends BaseElement {
+
+ protected LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+
+ public Queue(String id) {
+ super(id);
+ }
+
+ @SuppressWarnings("incomplete-switch")
+ @Override
+ public void poll(boolean block) {
+ try {
+ ByteBuffer buf = null;
+ if (block) {
+ buf = queue.take();
+ } else {
+ buf = queue.poll(100, TimeUnit.MILLISECONDS);
+ }
+
+ if (buf != null)
+ pushDataToAllOuts(buf);
+
+ } catch (Exception e) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ closeQueue();
+ }
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ // Put incoming data into queue
+ try {
+ queue.put(buf);
+ } catch (Exception e) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ closeQueue();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case LINK_SWITCH_TO_PULL_MODE:
+ // Do not propagate this event, because this element is boundary between
+ // threads
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ super.onClose();
+ closeQueue();
+ }
+
+ private void closeQueue() {
+ queue.clear();
+ queue.add(null);
+ // Drop queue to indicate that upstream is closed.
+ // May produce NPE in poll().
+ queue = null;
+ }
+
+ @Override
+ public String toString() {
+ return "Queue(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+
+ Element source1 = new FakeSource("source1") {
+ {
+ this.delay = 100;
+ this.numBuffers = 10;
+ this.incommingBufLength = 10;
+ }
+ };
+
+ Element source2 = new FakeSource("source2") {
+ {
+ this.delay = 100;
+ this.numBuffers = 10;
+ this.incommingBufLength = 10;
+ }
+ };
+
+ Pipeline pipeline = new PipelineImpl("test");
+ pipeline.add(source1);
+ pipeline.add(source2);
+ pipeline.add(new Queue("queue"));
+ pipeline.add(new FakeSink("sink"));
+
+ // Main flow
+ pipeline.link("source1", "in1< queue");
+ pipeline.link("source2", "in2< queue");
+ pipeline.link("queue", "sink");
+
+ new Thread(pipeline.getLink("source1", STDOUT)).start();
+ new Thread(pipeline.getLink("source2", STDOUT)).start();
+ pipeline.getLink("sink", STDIN).run();
+ }
+}