You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by de...@apache.org on 2013/11/08 09:16:06 UTC

[3/9] Remote Desktop Protocol(RDP) client that is suitable for including in the console VM. The client renders RDP to a window created by Java. It is important for Hyper-V support, because Hyper-V provides access to the consoles of VMs that it is running

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java
new file mode 100644
index 0000000..832c731
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ByteBuffer.java
@@ -0,0 +1,826 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class represents a slice in a buffer.
+ */
+public class ByteBuffer {
+
+  public static final String SEQUENCE_NUMBER = "seq";
+
+  public byte data[];
+  public int offset = 0;
+  public int length = 0;
+  public int cursor = 0;
+
+  private int refCount = 1;
+  private ByteBuffer parentByteBuffer = null;
+
+  private Order order;
+
+  /**
+   * Create buffer of size no less than length. Buffer can be a bit larger than
+   * length. Offset also can be set to non-zero value to leave some place for
+   * future headers.
+   */
+  public ByteBuffer(int minLength) {
+    // Get buffer of acceptable size from buffer pool
+    this.data = BufferPool.allocateNewBuffer(minLength);
+    this.offset = 0;
+    this.length = minLength;
+  }
+
+  public ByteBuffer(byte data[]) {
+    if (data == null)
+      throw new NullPointerException("Data must be non-null.");
+
+    this.data = data;
+    this.offset = 0;
+    this.length = data.length;
+  }
+
+  public ByteBuffer(byte[] data, int offset, int length) {
+    if (data == null)
+      throw new NullPointerException("Data must be non-null.");
+
+    this.data = data;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  /**
+   * Create byte buffer of requested size with some space reserved for future
+   * headers.
+   */
+  public ByteBuffer(int minLength, boolean reserveSpaceForHeader) {
+    // Get buffer of acceptable size from buffer pool
+    this.data = BufferPool.allocateNewBuffer(128 + minLength);
+    this.offset = 128; // 100 bytes should be enough for headers
+    this.length = minLength;
+  }
+
+  /**
+   * Create empty buffer with given order only.
+   */
+  public ByteBuffer(Order order) {
+    this.order = order;
+  }
+
+  public void setOrder(Order order) {
+    this.order = order;
+  }
+
+  public Order getOrder() {
+    return order;
+  }
+
+  @Override
+  public String toString() {
+    return toString(100);
+  }
+
+  /**
+   * Return string representation of this byte buffer.
+   * 
+   * @param maxLength
+   *          number of bytes to show in string
+   */
+  public String toString(int maxLength) {
+    return "ByteRange(){offset=" + offset + ", length=" + length + ", cursor=" + cursor + ", data=" + ((data == null) ? "null" : toHexString(maxLength))
+        + ((metadata == null || metadata.size() == 0) ? "" : ", metadata=" + metadata) + "}";
+  }
+
+  /**
+   * Return string representation of this byte buffer as hexadecimal numbers,
+   * e.g. "[0x01, 0x02]".
+   * 
+   * @param maxLength
+   *          number of bytes to show in string
+   */
+  public String toHexString(int maxLength) {
+    StringBuilder builder = new StringBuilder(maxLength * 6);
+    builder.append('[');
+    for (int i = 0; i < maxLength && i < length; i++) {
+      if (i > 0)
+        builder.append(", ");
+      int b = data[offset + i] & 0xff;
+      builder.append("0x" + ((b < 16) ? "0" : "") + Integer.toString(b, 16));
+    }
+    builder.append(']');
+    return builder.toString();
+  }
+
+  /**
+   * Return string representation of this byte buffer as hexadecimal numbers,
+   * e.g. "01 02".
+   * 
+   * @param maxLength
+   *          number of bytes to show in string
+   */
+  public String toPlainHexString(int maxLength) {
+    StringBuilder builder = new StringBuilder(maxLength * 3);
+    for (int i = 0; i < maxLength && i < length; i++) {
+      if (i > 0)
+        builder.append(" ");
+      int b = data[offset + i] & 0xff;
+      builder.append(((b < 16) ? "0" : "") + Integer.toString(b, 16));
+    }
+    return builder.toString();
+  }
+
+  public void dump() {
+    System.out.println(toString(length));
+  }
+
+  public void extend(int newLength) {
+    if (data.length < newLength)
+      Arrays.copyOf(data, newLength);
+  }
+
+  public void ref() {
+    refCount++;
+  }
+
+  public void unref() {
+    refCount--;
+
+    if (refCount == 0) {
+
+      if (parentByteBuffer != null) {
+        parentByteBuffer.unref();
+        parentByteBuffer = null;
+      } else {
+        // Return buffer to buffer pool
+        BufferPool.recycleBuffer(data);
+      }
+
+      data = null;
+    }
+
+  }
+
+  public boolean isSoleOwner() {
+    return refCount == 1 && (parentByteBuffer == null);
+  }
+
+  /**
+   * Create shared lightweight copy of part of this buffer.
+   */
+  public ByteBuffer slice(int offset, int length, boolean copyMetadata) {
+    ref();
+
+    if (this.length < (offset + length))
+      throw new RuntimeException("Length of region is larger that length of this buffer. Buffer length: " + this.length + ", offset: " + offset
+          + ", new region length: " + length + ".");
+
+    ByteBuffer slice = new ByteBuffer(data, this.offset + offset, length);
+
+    if (copyMetadata && this.metadata != null)
+      slice.metadata = new HashMap<String, Object>(metadata);
+
+    return slice;
+  }
+
+  private Map<String, Object> metadata = null;
+
+  public Object putMetadata(String key, Object value) {
+    if (metadata == null)
+      metadata = new HashMap<String, Object>();
+    return metadata.put(key, value);
+  }
+
+  public Object getMetadata(String key) {
+    return (metadata != null) ? metadata.get(key) : null;
+  }
+
+  /**
+   * Create new buffer, which holds data from both buffers. Expensive operation.
+   * 
+   * @TODO if only one reference to this ByteBuffer exists, then extend this
+   *       buffer instead of creating new buffer
+   * @TODO support list of buffers to avoid expensive joins until absolute
+   *       necessary
+   */
+  public ByteBuffer join(ByteBuffer buf) {
+    // Extend byte array for new data
+    int newLength = length + buf.length;
+    byte newData[] = new byte[newLength];
+
+    // Copy data from our buffer
+    System.arraycopy(data, offset, newData, 0, length);
+
+    // Copy data from other buffer
+    System.arraycopy(buf.data, buf.offset, newData, length, buf.length);
+
+    ByteBuffer newBuf = new ByteBuffer(newData);
+
+    // Copy our (older) metadata to new buffer, because handler might store some
+    // metadata in buffer, which is pushed back.
+    if (metadata != null)
+      newBuf.metadata = new HashMap<String, Object>(metadata);
+
+    return newBuf;
+  }
+
+  /**
+   * Copy used portion of buffer to new byte array. Expensive operation.
+   */
+  public byte[] toByteArray() {
+    return Arrays.copyOfRange(data, offset, offset + length);
+  }
+
+  public short[] toShortArray() {
+    if (length % 2 != 0)
+      throw new ArrayIndexOutOfBoundsException("Length of byte array must be dividable by 2 without remainder. Array length: " + length + ", remainder: "
+          + (length % 2) + ".");
+
+    short[] buf = new short[length / 2];
+
+    for (int i = 0, j = offset; i < buf.length; i++, j += 2) {
+      buf[i] = (short) ((data[j + 0] & 0xFF) | ((data[j + 1] & 0xFF) << 8));
+    }
+    return buf;
+  }
+
+  /**
+   * Return array of int's in little endian order.
+   */
+  public int[] toIntLEArray() {
+    if (length % 4 != 0)
+      throw new ArrayIndexOutOfBoundsException("Length of byte array must be dividable by 4 without remainder. Array length: " + length + ", remainder: "
+          + (length % 4) + ".");
+
+    int[] buf = new int[length / 4];
+
+    for (int i = 0, j = offset; i < buf.length; i++, j += 4) {
+      buf[i] = (data[j + 0] & 0xFF) | ((data[j + 1] & 0xFF) << 8) | ((data[j + 2] & 0xFF) << 16) | ((data[j + 3] & 0xFF) << 24);
+    }
+    return buf;
+  }
+
+  /**
+   * Return array of int's in little endian order, but use only 3 bytes per int (3RGB).
+   */
+  public int[] toInt3LEArray() {
+    if (length % 3 != 0)
+      throw new ArrayIndexOutOfBoundsException("Length of byte array must be dividable by 3 without remainder. Array length: " + length + ", remainder: "
+          + (length % 3) + ".");
+
+    int[] buf = new int[length / 3];
+
+    for (int i = 0, j = offset; i < buf.length; i++, j += 3) {
+      buf[i] = (data[j + 0] & 0xFF) | ((data[j + 1] & 0xFF) << 8) | ((data[j + 2] & 0xFF) << 16);
+    }
+    return buf;
+  }
+
+  /**
+   * Helper method for test cases to convert array of byte arrays to array of
+   * byte buffers.
+   */
+  public static ByteBuffer[] convertByteArraysToByteBuffers(byte[]... bas) {
+    ByteBuffer bufs[] = new ByteBuffer[bas.length];
+
+    int i = 0;
+    for (byte[] ba : bas) {
+      bufs[i++] = new ByteBuffer(ba);
+    }
+    return bufs;
+  }
+
+  /**
+   * Read signed int in network order. Cursor is advanced by 4.
+   */
+  public int readSignedInt() {
+    if (cursor + 4 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 4 bytes from this buffer: " + this + ".");
+
+    int result = (((data[offset + cursor] & 0xff) << 24) + ((data[offset + cursor + 1] & 0xff) << 16) + ((data[offset + cursor + 2] & 0xff) << 8) + (data[offset
+        + cursor + 3] & 0xff));
+    cursor += 4;
+    return result;
+  }
+
+  /**
+   * Read signed int in little endian order. Cursor is advanced by 4.
+   */
+  public int readSignedIntLE() {
+    if (cursor + 4 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 4 bytes from this buffer: " + this + ".");
+
+    int result = (((data[offset + cursor + 3] & 0xff) << 24) + ((data[offset + cursor + 2] & 0xff) << 16) + ((data[offset + cursor + 1] & 0xff) << 8) + (data[offset
+        + cursor] & 0xff));
+    cursor += 4;
+    return result;
+  }
+
+  /**
+   * Read unsigned int in little endian order. Cursor is advanced by 4.
+   */
+  public long readUnsignedIntLE() {
+    if (cursor + 4 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 4 bytes from this buffer: " + this + ".");
+
+    long result = (((long) (data[offset + cursor + 3] & 0xff) << 24) + ((long) (data[offset + cursor + 2] & 0xff) << 16)
+        + ((long) (data[offset + cursor + 1] & 0xff) << 8) + (long) (data[offset + cursor] & 0xff));
+    cursor += 4;
+    return result;
+  }
+
+  /**
+   * Read signed int in variable length format. Top most bit of each byte
+   * indicates that next byte contains additional bits. Cursor is advanced by
+   * 1-5 bytes.
+   */
+  public int readVariableSignedIntLE() {
+    int result = 0;
+
+    for (int shift = 0; shift < 32; shift += 7) {
+      int b = readUnsignedByte();
+      result |= (b & 0x7f) << shift;
+      if ((b & 0x80) == 0)
+        break;
+    }
+
+    return result;
+  }
+
+  /**
+   * Read unsigned int in network order in variable length format. Cursor is
+   * advanced by 1 to 4 bytes.
+   * 
+   * Two most significant bits of first byte indicates length of field: 0x00 - 1
+   * byte, 0x40 - 2 bytes, 0x80 - 3 bytes, 0xc0 - 4 bytes.
+   * 
+   * @see http://msdn.microsoft.com/en-us/library/cc241614.aspx
+   */
+  public int readEncodedUnsignedInt() {
+    int firstByte = readUnsignedByte();
+    int result;
+    switch (firstByte & 0xc0) {
+    default:
+    case 0x00:
+      result = firstByte & 0x3f;
+      break;
+    case 0x40:
+      result = (firstByte & 0x3f << 8) | readUnsignedByte();
+      break;
+    case 0x80:
+      result = (((firstByte & 0x3f << 8) | readUnsignedByte()) << 8) | readUnsignedByte();
+      break;
+    case 0xc0:
+      result = ((((firstByte & 0x3f << 8) | readUnsignedByte()) << 8) | readUnsignedByte() << 8) | readUnsignedByte();
+      break;
+    }
+
+    return result;
+  }
+
+  /**
+   * Read unsigned byte. Cursor is advanced by 1.
+   */
+  public int readUnsignedByte() {
+    if (cursor + 1 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 1 byte from this buffer: " + this + ".");
+
+    int b = data[offset + cursor] & 0xff;
+    cursor += 1;
+    return b;
+  }
+
+  /**
+   * Read signed byte. Cursor is advanced by 1.
+   */
+  public byte readSignedByte() {
+    if (cursor + 1 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 1 byte from this buffer: " + this + ".");
+
+    byte b = data[offset + cursor];
+    cursor += 1;
+    return b;
+  }
+
+  /**
+   * Read unsigned short in network order. Cursor is advanced by 2.
+   */
+  public int readUnsignedShort() {
+    if (cursor + 2 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 2 bytes from this buffer: " + this + ".");
+
+    int result = (((data[offset + cursor] & 0xff) << 8) | (data[offset + cursor + 1] & 0xff));
+    cursor += 2;
+    return result;
+  }
+
+  /**
+   * Read signed short in little endian order. Cursor is advanced by 2.
+   */
+  public short readSignedShortLE() {
+    if (cursor + 2 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 2 bytes from this buffer: " + this + ".");
+
+    short result = (short) (((data[offset + cursor + 1] & 0xff) << 8) | (data[offset + cursor] & 0xff));
+    cursor += 2;
+    return result;
+  }
+
+  /**
+   * Read unsigned short in network order in variable length format. Cursor is
+   * advanced by 1 or 2 bytes.
+   * 
+   * Most significant bit of first byte indicates length of field: 0 - 1 byte, 1
+   * - 2 bytes.
+   */
+  public int readVariableUnsignedShort() {
+    int firstByte = readUnsignedByte();
+
+    int result;
+    if ((firstByte & 0x80) == 0)
+      result = firstByte & 0x7f;
+    else {
+      int secondByte = readUnsignedByte();
+      result = (((firstByte & 0x7f) << 8) | secondByte);
+    }
+
+    return result;
+  }
+
+  /**
+   * Read unsigned short in little endian order. Cursor is advanced by 2.
+   */
+  public int readUnsignedShortLE() {
+    if (cursor + 2 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read 2 bytes from this buffer: " + this + ".");
+
+    int result = (((data[offset + cursor + 1] & 0xff) << 8) | (data[offset + cursor] & 0xff));
+    cursor += 2;
+    return result;
+  }
+
+  /**
+   * Read unsigned short in network order in variable length format. Cursor is
+   * advanced by 1 or 2 bytes.
+   * 
+   * Most significant bit of first byte indicates length of field: 0x00 - 1
+   * byte, 0x80 - 2 bytes.
+   * 
+   * @see http://msdn.microsoft.com/en-us/library/cc241612.aspx
+   */
+  public int readEncodedUnsignedShort() {
+    int firstByte = readUnsignedByte();
+
+    int result;
+    if ((firstByte & 0x80) == 0)
+      result = firstByte & 0x7f;
+    else {
+      int secondByte = readUnsignedByte();
+      result = (((firstByte & 0x7f) << 8) | secondByte);
+    }
+
+    return result;
+  }
+
+  /**
+   * Read signed short in network order in variable length format. Cursor is
+   * advanced by 1 or 2 bytes.
+   * 
+   * Most significant bit of first byte indicates length of field: 0x00 - 1
+   * byte, 0x80 - 2 bytes. Second most significant bit indicates is value
+   * positive or negative.
+   * 
+   * @see http://msdn.microsoft.com/en-us/library/cc241613.aspx
+   */
+  public int readEncodedSignedShort() {
+    int firstByte = readUnsignedByte();
+
+    int result;
+    if ((firstByte & 0x80) == 0)
+      result = firstByte & 0x3f;
+    else {
+      int secondByte = readUnsignedByte();
+      result = (((firstByte & 0x3f) << 8) | secondByte);
+    }
+
+    if ((firstByte & 0x40) > 0)
+      return -result;
+    else
+      return result;
+  }
+
+  /**
+   * Read signed long in little endian order. Cursor is advanced by 8 bytes.
+   */
+  public long readSignedLongLE() {
+    return (((long) readSignedIntLE()) & 0xffFFffFFL) | (((long) readSignedIntLE()) << 32);
+  }
+
+  /**
+   * Read string from buffer. Cursor is advanced by string length.
+   */
+  public String readString(int length, Charset charset) {
+    if (cursor + length > this.length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read " + length + " bytes from this buffer: " + this + ".");
+
+    String string = new String(data, offset + cursor, length, charset);
+    cursor += length;
+    return string;
+  }
+
+  /**
+   * Get bytes as lightweight slice. Cursor is advanced by data length.
+   */
+  public ByteBuffer readBytes(int dataLength) {
+    if (cursor + dataLength > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read " + dataLength + " bytes from this buffer: " + this + ".");
+
+    ByteBuffer slice = slice(cursor, dataLength, false);
+    cursor += dataLength;
+    return slice;
+  }
+
+  /**
+   * Cursor is advanced by given number of bytes.
+   */
+  public void skipBytes(int numOfBytes) {
+    if (cursor + numOfBytes > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot read " + numOfBytes + " bytes from this buffer: " + this + ".");
+
+    cursor += numOfBytes;
+  }
+
+  /**
+   * Write byte. Cursor is advanced by 1.
+   */
+  public void writeByte(int b) {
+    if (cursor + 1 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot write 1 byte to this buffer: " + this + ".");
+
+    data[offset + cursor] = (byte) b;
+    cursor += 1;
+  }
+
+  /**
+   * Write short in network order. Cursor is advanced by 2.
+   */
+  public void writeShort(int x) {
+    if (cursor + 2 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot write 2 bytes to this buffer: " + this + ".");
+
+    data[offset + cursor] = (byte) (x >> 8);
+    data[offset + cursor + 1] = (byte) x;
+    cursor += 2;
+  }
+
+  /**
+   * Write short in little endian order. Cursor is advanced by 2.
+   */
+  public void writeShortLE(int x) {
+    if (cursor + 2 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot write 2 bytes to this buffer: " + this + ".");
+
+    data[offset + cursor + 1] = (byte) (x >> 8);
+    data[offset + cursor] = (byte) x;
+    cursor += 2;
+  }
+
+  /**
+   * Write int in network order. Cursor is advanced by 4.
+   */
+  public void writeInt(int i) {
+    if (cursor + 4 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot write 4 bytes to this buffer: " + this + ".");
+
+    data[offset + cursor] = (byte) (i >> 24);
+    data[offset + cursor + 1] = (byte) (i >> 16);
+    data[offset + cursor + 2] = (byte) (i >> 8);
+    data[offset + cursor + 3] = (byte) i;
+    cursor += 4;
+  }
+
+  public void writeIntLE(int i) {
+    if (cursor + 4 > length)
+      throw new ArrayIndexOutOfBoundsException("Cannot write 4 bytes to this buffer: " + this + ".");
+
+    data[offset + cursor] = (byte) i;
+    data[offset + cursor + 1] = (byte) (i >> 8);
+    data[offset + cursor + 2] = (byte) (i >> 16);
+    data[offset + cursor + 3] = (byte) (i >> 24);
+    cursor += 4;
+  }
+
+  /**
+   * Write int in variable length format. Cursor is advanced by number of bytes
+   * written (1-5).
+   * 
+   * Topmost bit of each byte is set to 1 to indicate that next byte has data.
+   */
+  public void writeVariableIntLE(int i) {
+    while (i != 0) {
+      // Get lower bits of number
+      int b = i & 0x7f;
+      i >>= 7;
+
+      if (i > 0)
+        // Set topmost bit of byte to indicate that next byte(s) contains
+        // remainder bits
+        b |= 0x80;
+
+      writeByte(b);
+    }
+  }
+
+  /**
+   * Write short in variable length format. Cursor is advanced by number of
+   * bytes written (1-2).
+   * 
+   * Topmost bit of first byte is set to 1 to indicate that next byte has data.
+   */
+  public void writeVariableShort(int length) {
+    if (length > 0x7f | length < 0)
+      writeShort(length | 0x8000);
+    else
+      writeByte(length);
+  }
+
+  /**
+   * Prepend given data to this byte buffer.
+   */
+  public void prepend(ByteBuffer buf) {
+    prepend(buf.data, buf.offset, buf.length);
+  }
+
+  /**
+   * Prepend given data to this byte buffer.
+   */
+  public void prepend(byte[] data) {
+    prepend(data, 0, data.length);
+  }
+
+  /**
+   * Prepend given data to this byte buffer.
+   */
+  public void prepend(byte[] data, int offset, int length) {
+    if (!isSoleOwner()) {
+      throw new RuntimeException("Create full copy of this byte buffer data for modification. refCount: " + refCount + ", parentByteBuffer: "
+          + parentByteBuffer + ".");
+    }
+
+    // If there is no enough space for header to prepend
+    if (!(this.offset >= length)) {
+      throw new RuntimeException("Reserve data to have enough space for header.");
+    }
+
+    // Copy header
+    System.arraycopy(data, offset, this.data, this.offset - length, length);
+
+    // Extend byte range to include header
+    this.offset -= length;
+    this.length += length;
+    this.cursor += length;
+  }
+
+  public void writeString(String str, Charset charset) {
+    writeBytes(str.getBytes(charset));
+  }
+
+  /**
+   * Write string of fixed size. When string is shorted, empty space is filled
+   * with zeros. When string is larger, it is truncated.
+   */
+  public void writeFixedString(int length, String str, Charset charset) {
+    byte[] bytes = str.getBytes(charset);
+    writeBytes(bytes, 0, Math.min(bytes.length, length));
+
+    for (int i = bytes.length; i < length; i++)
+      writeByte(0);
+  }
+
+  public void writeBytes(ByteBuffer buf) {
+    writeBytes(buf.data, buf.offset, buf.length);
+  }
+
+  public void writeBytes(byte[] bytes) {
+    writeBytes(bytes, 0, bytes.length);
+  }
+
+  public void writeBytes(byte[] bytes, int offset, int length) {
+    System.arraycopy(bytes, offset, this.data, this.offset + this.cursor, length);
+    cursor += length;
+  }
+
+  // /**
+  // * Write BER encoded definite long variant of the ASN.1 length field.
+  // */
+  // public void writeBerLength(int value) {
+  // int fieldLength;
+  // if (value > 0xFFffFF)
+  // fieldLength = 4;
+  // else if (value > 0xFFff)
+  // fieldLength = 3;
+  // else if (value > 0xFF)
+  // fieldLength = 2;
+  // else
+  // fieldLength = 1;
+  //
+  // if (cursor + fieldLength + 1 > length)
+  // throw new ArrayIndexOutOfBoundsException("Cannot write " + (fieldLength +
+  // 1) + " byte(s) to this buffer: " + this + ".");
+  //
+  // // Write length of length field itself
+  // writeByte(0x80 | fieldLength);
+  //
+  // switch (fieldLength) {
+  // case 4:
+  // data[offset + cursor++] = (byte) (value >> 24);
+  // case 3:
+  // data[offset + cursor++] = (byte) (value >> 16);
+  // case 2:
+  // data[offset + cursor++] = (byte) (value >> 8);
+  // case 1:
+  // data[offset + cursor++] = (byte) value;
+  // }
+  //
+  // }
+
+  /**
+   * Reduce length of buffer to cursor position.
+   */
+  public void trimAtCursor() {
+    length = cursor;
+  }
+
+  /**
+   * Rewind cursor to beginning of buffer.
+   */
+  public void rewindCursor() {
+    cursor = 0;
+  }
+
+  /**
+   * Read RGB color in LE order. Cursor is advanced by 3.
+   * 
+   * @return color as int, with red in lowest octet.
+   */
+  public int readRGBColor() {
+    return readUnsignedByte() | (readUnsignedByte() << 8) | (readUnsignedByte() << 16);
+  }
+
+  public void assertThatBufferIsFullyRead() {
+    if (cursor != length)
+      throw new RuntimeException("Data in buffer is not read fully. Buf: " + this + ".");
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+
+    int end = offset + length;
+    for (int i = offset; i < end; i++)
+      result = 31 * result + data[i];
+
+    result = prime * result + length;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+
+    ByteBuffer other = (ByteBuffer) obj;
+    if (length != other.length)
+      return false;
+
+    for (int i = 0; i < length; i++)
+      if (data[offset + i] != other.data[other.offset + i])
+        return false;
+
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java
new file mode 100644
index 0000000..e3de289
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSink.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public interface DataSink {
+
+  void sendData(ByteBuffer buf);
+
+  void sendEvent(Event event, Direction direction);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java
new file mode 100644
index 0000000..152be2e
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/DataSource.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public interface DataSource {
+
+  /**
+   * Get data from source.
+   * 
+   * @param block
+   *          if false, then return immediately when no data is available,
+   *          otherwise wait for data
+   * @return new data or null, when no data is available
+   */
+  ByteBuffer pull(boolean block);
+
+  /**
+   * Hold data temporary to use at next pull or push.
+   * 
+   * @param buf
+   *          a data
+   */
+  void pushBack(ByteBuffer buf);
+
+  /**
+   * Hold data temporary to use at next pull. Don't return abything untill given
+   * amount of data will be read from source, because data will be pushed back
+   * anyway.
+   * 
+   * @param buf
+   *          a data
+   * @param lengthOfFullPacket
+   *          length of full block of data to read from source
+   */
+  void pushBack(ByteBuffer buf, int lengthOfFullPacket);
+
+  /**
+   * Send event to pads.
+   * 
+   * @param event
+   *          a event
+   * @param direction
+   *          pad direction
+   */
+  void sendEvent(Event event, Direction direction);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java
new file mode 100644
index 0000000..c9dede8
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Direction.java
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public enum Direction {
+  IN, OUT
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
new file mode 100644
index 0000000..c927dea
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.Set;
+
+/**
+ * Element is for processing of data. It has one or more contact pads, which can
+ * be wired with other elements using links.
+ */
+public interface Element {
+
+  /**
+   * Name of pad for standard input. Should be set in all elements except pure
+   * sinks.
+   */
+  public static final String STDIN = "stdin";
+
+  /**
+   * Name of pad for standard output. Should be set in all elements except pure
+   * sources.
+   */
+  public static final String STDOUT = "stdout";
+
+  /**
+   * Get link connected to given pad.
+   * 
+   * @param padName
+   *          Standard pads are "stdin" and "stdout".
+   */
+  Link getLink(String padName);
+
+  /**
+   * Get pads of this element.
+   */
+  Set<String> getPads(Direction direction);
+
+  /**
+   * Connect link to given pad.
+   * 
+   * @param padName
+   *          a pad name. Standard pads are "stdin" and "stdout".
+   */
+  void setLink(String padName, Link link, Direction direction);
+
+  /**
+   * Disconnect link from given pad.
+   * 
+   * @param padName
+   *          Standard pads are "stdin" and "stdout".
+   */
+  void dropLink(String padName);
+
+  /**
+   * Pull data from element and handle it. Element should ask one of it input
+   * pads for data, handle data and push result to it sink(s), if any.
+   * 
+   * @param block
+   *          block until data will be available, or do a slight delay at least,
+   *          when data is not available
+   */
+  void poll(boolean block);
+
+  /**
+   * Handle incoming data.
+   * 
+   * @param buf
+   *          a data
+   * @param link
+   *          TODO
+   */
+  void handleData(ByteBuffer buf, Link link);
+
+  /**
+   * Handle event.
+   * 
+   * @param event
+   *          an event
+   * @param direction
+   *          if IN, then send event to input pads, when OUT, then send to
+   *          output pads
+   */
+  void handleEvent(Event event, Direction direction);
+
+  /**
+   * Get element ID.
+   */
+  String getId();
+
+  /**
+   * Validate element: check is all required pads are connected.
+   */
+  void validate();
+
+  /**
+   * Drop link.
+   * 
+   * @param link a link to drop
+   */
+  void dropLink(Link link);
+
+  /**
+   * Drop existing link and replace it by new link.
+   */
+  void replaceLink(Link existingLink, Link newLink);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
new file mode 100644
index 0000000..5e1a389
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public enum Event {
+  STREAM_START, 
+  STREAM_CLOSE,
+  
+  /**
+   * Upgrade socket to SSL.
+   */
+  SOCKET_UPGRADE_TO_SSL,
+  
+  /**
+   * Switch links to input mode.
+   */
+  LINK_SWITCH_TO_PULL_MODE
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
new file mode 100644
index 0000000..65fb29e
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class FakeSink extends BaseElement {
+
+  public FakeSink(String id) {
+    super(id);
+  }
+
+  @Override
+  public void handleData(ByteBuffer buf, Link link) {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+    if (buf == null)
+      return;
+
+    // Use packetNumber variable to count incoming packets
+    packetNumber++;
+
+    buf.unref();
+  }
+
+  @Override
+  public String toString() {
+    return "FakeSink(" + id + ")";
+  }
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+
+    Element sink = new FakeSink("sink") {
+      {
+        verbose = true;
+      }
+    };
+
+    byte[] data = new byte[] { 1, 2, 3 };
+    ByteBuffer buf = new ByteBuffer(data);
+    sink.setLink(STDIN, new SyncLink(), Direction.IN);
+    sink.getLink(STDIN).sendData(buf);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
new file mode 100644
index 0000000..4cf6503
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class FakeSource extends BaseElement {
+
+  /**
+   * Delay for null packets in poll method when blocking is requested, in
+   * milliseconds.
+   */
+  protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+  public FakeSource(String id) {
+    super(id);
+  }
+
+  @Override
+  public void poll(boolean block) {
+    if (numBuffers > 0 && packetNumber >= numBuffers) {
+      // Close stream when limit of packets is reached
+      sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+      return;
+    }
+
+    // Prepare new packet
+    ByteBuffer buf = initializeData();
+
+    // Push it to output(s)
+    pushDataToAllOuts(buf);
+
+    // Make slight delay when blocking input was requested (to avoid
+    // consuming of 100% in parent loop)
+    if (block)
+      delay();
+
+  }
+
+  /**
+   * Make slight delay. Should be used when blocking input is requested in pull
+   * mode, but null packed was returned by input.
+   */
+  protected void delay() {
+    try {
+      Thread.sleep(delay);
+    } catch (InterruptedException e) {
+    }
+  }
+
+  /**
+   * Initialize data.
+   */
+  public ByteBuffer initializeData() {
+    ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+    // Set first byte of package to it sequance number
+    buf.data[buf.offset] = (byte) (packetNumber % 128);
+
+    // Initialize rest of bytes with sequential values, which are
+    // corresponding with their position in byte buffer
+    for (int i = buf.offset + 1; i < buf.length; i++)
+      buf.data[i] = (byte) (i % 128);
+
+    buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+    buf.putMetadata("src", id);
+
+    return buf;
+  }
+
+  @Override
+  public String toString() {
+    return "FakeSource(" + id + ")";
+  }
+
+  public static void main(String args[]) {
+
+    Element fakeSource = new FakeSource("source 3/10/100") {
+      {
+        verbose = true;
+        this.incommingBufLength = 3;
+        this.numBuffers = 10;
+        this.delay = 100;
+      }
+    };
+
+    Element fakeSink = new FakeSink("sink") {
+      {
+        this.verbose = true;
+      }
+    };
+
+    Element fakeSink2 = new FakeSink("sink2") {
+      {
+        this.verbose = true;
+      }
+    };
+
+    Link link = new SyncLink();
+
+    fakeSource.setLink(STDOUT, link, Direction.OUT);
+    fakeSink.setLink(STDIN, link, Direction.IN);
+
+    Link link2 = new SyncLink();
+
+    fakeSource.setLink("out2", link2, Direction.OUT);
+    fakeSink2.setLink(STDIN, link2, Direction.IN);
+
+    link.run();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
new file mode 100644
index 0000000..b05637f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class InputStreamSource extends BaseElement {
+
+  protected InputStream is;
+  protected SocketWrapper socketWrapper;
+
+  public InputStreamSource(String id) {
+    super(id);
+  }
+
+  public InputStreamSource(String id, InputStream is) {
+    super(id);
+    this.is = is;
+  }
+
+  public InputStreamSource(String id, SocketWrapper socketWrapper) {
+    super(id);
+    this.socketWrapper = socketWrapper;
+  }
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    switch (event) {
+    case SOCKET_UPGRADE_TO_SSL:
+      socketWrapper.upgradeToSsl();
+      break;
+    default:
+      super.handleEvent(event, direction);
+    }
+  }
+
+  @Override
+  public void setLink(String padName, Link link, Direction direction) {
+    switch (direction) {
+    case OUT:
+      super.setLink(padName, link, direction);
+
+      if (is == null) {
+        // Pause links until data stream will be ready
+        link.pause();
+      }
+      break;
+    case IN:
+      throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+    }
+  }
+
+  public void setInputStream(InputStream is) {
+    this.is = is;
+
+    // Resume links
+    resumeLinks();
+  }
+
+  private void resumeLinks() {
+    for (DataSink sink : outputPads.values())
+      ((Link) sink).resume();
+  }
+
+  /**
+   * Read data from input stream.
+   */
+  @Override
+  public void poll(boolean block) {
+    try {
+      if (!block && is.available() == 0) {
+
+        if (verbose)
+          System.out.println("[" + this + "] INFO: No data in stream is available now, returning.");
+
+        return;
+      }
+
+      // Create buffer of recommended size and with default offset
+      ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+      if (verbose)
+        System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+      int actualLength = is.read(buf.data, buf.offset, buf.data.length - buf.offset);
+
+      if (actualLength < 0) {
+        if (verbose)
+          System.out.println("[" + this + "] INFO: End of stream.");
+
+        buf.unref();
+        closeStream();
+        sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+        return;
+      }
+
+      if (actualLength == 0) {
+        if (verbose)
+          System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+        buf.unref();
+        return;
+      }
+
+      buf.length = actualLength;
+
+      if (verbose)
+        System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+      pushDataToAllOuts(buf);
+
+    } catch (IOException e) {
+      System.err.println("[" + this + "] ERROR: " + e.getMessage());
+      closeStream();
+    }
+  }
+
+  @Override
+  protected void onClose() {
+    closeStream();
+  }
+
+  private void closeStream() {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Closing stream.");
+
+    try {
+      is.close();
+    } catch (IOException e) {
+    }
+    try {
+      sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "InputStreamSource(" + id + ")";
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+    InputStream is = new ByteArrayInputStream(new byte[] { 1, 2, 3 });
+
+    InputStreamSource source = new InputStreamSource("source") {
+      {
+        verbose = true;
+      }
+    };
+    Element fakeSink = new FakeSink("sink") {
+      {
+        verbose = true;
+      }
+    };
+
+    Link link = new SyncLink() {
+      {
+        verbose = true;
+      }
+    };
+
+    source.setLink(STDOUT, link, Direction.OUT);
+    fakeSink.setLink(STDIN, link, Direction.IN);
+
+    source.setInputStream(is);
+
+    link.sendEvent(Event.STREAM_START, Direction.OUT);
+    link.run();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
new file mode 100644
index 0000000..bd970f0
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Link is wire between two elements. It always must contain source and sink
+ * elements.
+ */
+public interface Link extends DataSource, DataSink, Runnable {
+
+  /**
+   * Wire this link with given sink.
+   * 
+   * @param sink
+   *          an Element
+   * @return same sink element, for chaining
+   */
+  Element setSink(Element sink);
+
+  /**
+   * Wire this link with given source.
+   * 
+   * @param source
+   *          an Element
+   * @return same source element, for chaining
+   */
+  Element setSource(Element source);
+
+  Element getSource();
+
+  Element getSink();
+
+  /**
+   * Hold all data in cache, don't pass data to sink until resumed.
+   */
+  void pause();
+
+  /**
+   * Resume transfer.
+   */
+  void resume();
+  
+  /**
+   * Change mode of operation of this link from push mode to pull mode.
+   */
+  void setPullMode();
+  
+  /**
+   * Drop this link.
+   */
+  void drop();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
new file mode 100644
index 0000000..ce9fdf9
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.Arrays;
+
+/**
+ * Compare incoming packets with expected packets.
+ */
+public class MockSink extends BaseElement {
+
+  protected ByteBuffer bufs[] = null;
+
+  public MockSink(String id) {
+    super(id);
+  }
+
+  public MockSink(String id, ByteBuffer bufs[]) {
+    super(id);
+    this.bufs = bufs;
+  }
+
+  @Override
+  public void handleData(ByteBuffer buf, Link link) {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+    if (buf == null)
+      return;
+
+    if (packetNumber >= bufs.length)
+      throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
+          + ", unexpected buffer: " + buf + ".");
+
+    // Compare incoming buffer with expected buffer
+    if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
+      throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n  Actual bufer: " + buf
+          + ",\n  expected buffer: " + bufs[packetNumber] + ".");
+
+    if (verbose)
+      System.out.println("[" + this + "] INFO: buffers are equal.");
+
+    // Use packetNumber variable to count incoming packets
+    packetNumber++;
+
+    buf.unref();
+  }
+
+  @Override
+  protected void onClose() {
+    super.onClose();
+
+    if (packetNumber != bufs.length)
+      throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
+  }
+
+  @Override
+  public String toString() {
+    return "MockSink(" + id + ")";
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+
+    Element mockSource = new MockSource("source") {
+      {
+        this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+            new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+        this.verbose = true;
+        this.delay = 100;
+        this.numBuffers = this.bufs.length;
+      }
+    };
+
+    Element mockSink = new MockSink("sink") {
+      {
+        this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+            new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+        this.verbose = true;
+      }
+    };
+
+    Link link = new SyncLink() {
+      {
+        this.verbose = true;
+      }
+    };
+
+    mockSource.setLink(STDOUT, link, Direction.OUT);
+    mockSink.setLink(STDIN, link, Direction.IN);
+
+    link.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
new file mode 100644
index 0000000..db47db2
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class MockSource extends FakeSource {
+
+  protected ByteBuffer bufs[] = null;
+
+  public MockSource(String id) {
+    super(id);
+  }
+
+  public MockSource(String id, ByteBuffer bufs[]) {
+    super(id);
+    this.bufs = bufs;
+  }
+
+  /**
+   * Initialize data.
+   */
+  @Override
+  public ByteBuffer initializeData() {
+    if (packetNumber >= bufs.length) {
+      sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+      return null;
+    }
+
+    ByteBuffer buf = bufs[packetNumber];
+
+    buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+    return buf;
+  }
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+  }
+
+  @Override
+  public String toString() {
+    return "MockSource(" + id + ")";
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+
+    Element mockSource = new MockSource("source") {
+      {
+        this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+            new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+        this.verbose = true;
+        this.delay = 100;
+        // this.numBuffers = this.bufs.length;
+      }
+    };
+
+    Element fakeSink = new FakeSink("sink") {
+      {
+        this.verbose = true;
+      }
+    };
+
+    Link link = new SyncLink();
+
+    mockSource.setLink(STDOUT, link, Direction.OUT);
+    fakeSink.setLink(STDIN, link, Direction.IN);
+
+    link.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
new file mode 100644
index 0000000..a7d4848
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * One time switch for handshake and initialization stages.
+ * 
+ * At beginning, element handles data internally, sending output to "otout" pad.
+ * After switchOff() method is called, element drops its links, so packets from
+ * "stdin" pad are forwarded directly to "stdout" pad, without processing.
+ * 
+ * Event STREAM_START is captured by this element and not propagated further.
+ * When switchOff() method is called, event STREAM_START is generated and sent
+ * to "stdout".
+ */
+public abstract class OneTimeSwitch extends BaseElement {
+
+  /**
+   * One-time out - name of output pad for one time logic. By default, output
+   * directly to socket.
+   */
+  public static final String OTOUT = "otout";
+
+  private boolean switched = false;
+
+  public OneTimeSwitch(String id) {
+    super(id);
+    declarePads();
+  }
+
+  protected void declarePads() {
+    inputPads.put(STDIN, null);
+    outputPads.put(OTOUT, null);
+    outputPads.put(STDOUT, null);
+  }
+
+  @Override
+  public void handleData(ByteBuffer buf, Link link) {
+    if (switched)
+      throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
+
+    if (buf == null)
+      return;
+
+    handleOneTimeData(buf, link);
+  }
+
+  public void pushDataToOTOut(ByteBuffer buf) {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Sending data:  " + buf + ".");
+
+    outputPads.get(OTOUT).sendData(buf);
+  }
+
+  /**
+   * Switch this element off. Pass data directly to main output(s).
+   */
+  public void switchOff() {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Switching OFF.");
+
+    switched = true;
+    verbose = false;
+
+    // Rewire links: drop otout link, replace stdout link by stdin to send data
+    // directly to stdout
+    Link stdout = (Link) outputPads.get(STDOUT);
+    Link stdin = (Link) inputPads.get(STDIN);
+    Link otout = (Link) outputPads.get(OTOUT);
+
+    otout.drop();
+
+    // Wake up next peer(s)
+    sendEventToAllPads(Event.STREAM_START, Direction.OUT);
+
+    stdin.setSink(null);
+    inputPads.remove(STDIN);
+
+    Element nextPeer = stdout.getSink();
+    nextPeer.replaceLink(stdout, stdin);
+    stdout.drop();
+
+    for (Object link : inputPads.values().toArray())
+      ((Link) link).drop();
+    for (Object link : outputPads.values().toArray())
+      ((Link) link).drop();
+
+  }
+
+  public void switchOn() {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Switching ON.");
+
+    switched = false;
+  }
+
+  /**
+   * Override this method to handle one-time packet(s) at handshake or
+   * initialization stages. Execute method @see switchRoute() when this method
+   * is no longer necessary.
+   */
+  protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    if (event == Event.STREAM_START) {
+      if (verbose)
+        System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+      switchOn();
+
+      // Execute this element onStart(), but do not propagate event further,
+      // to not wake up next elements too early
+      onStart();
+    } else
+      super.handleEvent(event, direction);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
new file mode 100644
index 0000000..1d63850
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class Order {
+
+  public Object type;
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
new file mode 100644
index 0000000..d1aa5ce
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class OutputStreamSink extends BaseElement {
+
+  protected OutputStream os;
+  protected SocketWrapper socketWrapper;
+
+  public OutputStreamSink(String id) {
+    super(id);
+  }
+
+  public OutputStreamSink(String id, OutputStream os) {
+    super(id);
+    this.os = os;
+  }
+
+  public OutputStreamSink(String id, SocketWrapper socketWrapper) {
+    super(id);
+    this.socketWrapper = socketWrapper;
+  }
+
+  public void setOutputStream(OutputStream os) {
+    this.os = os;
+    // Resume links
+    resumeLinks();
+  }
+
+  /**
+   * Send incoming data to stream.
+   */
+  @Override
+  public void handleData(ByteBuffer buf, Link link) {
+    if (buf == null)
+      return;
+
+    try {
+      if (verbose)
+        System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+      os.write(buf.data, buf.offset, buf.length);
+      os.flush();
+    } catch (IOException e) {
+      System.err.println("[" + this + "] ERROR: " + e.getMessage());
+      closeStream();
+    }
+  }
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    switch (event) {
+    case SOCKET_UPGRADE_TO_SSL:
+      socketWrapper.upgradeToSsl();
+      break;
+    default:
+      super.handleEvent(event, direction);
+    }
+  }
+
+  @Override
+  public void setLink(String padName, Link link, Direction direction) {
+    switch (direction) {
+    case IN:
+      super.setLink(padName, link, direction);
+
+      if (os == null)
+        // Pause links until data stream will be ready
+        link.pause();
+      break;
+    case OUT:
+      throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+    }
+  }
+
+  private void resumeLinks() {
+    for (DataSource source : inputPads.values())
+      ((Link) source).resume();
+  }
+
+  @Override
+  protected void onClose() {
+    closeStream();
+  }
+
+  private void closeStream() {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Closing stream.");
+
+    try {
+      os.close();
+    } catch (IOException e) {
+    }
+    try {
+      sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "OutputStreamSink(" + id + ")";
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+    Element source = new FakeSource("source") {
+      {
+        this.verbose = true;
+        this.numBuffers = 3;
+        this.incommingBufLength = 5;
+        this.delay = 100;
+      }
+    };
+
+    OutputStreamSink sink = new OutputStreamSink("sink") {
+      {
+        verbose = true;
+      }
+    };
+
+    Link link = new SyncLink();
+
+    source.setLink(STDOUT, link, Direction.OUT);
+    sink.setLink(STDIN, link, Direction.IN);
+
+    sink.setOutputStream(new ByteArrayOutputStream());
+
+    link.sendEvent(Event.STREAM_START, Direction.IN);
+    link.run();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
new file mode 100644
index 0000000..c369350
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Pipeline groups multiple elements.
+ */
+public interface Pipeline extends Element {
+
+  static final String IN = Direction.IN.toString();
+  static final String OUT = Direction.OUT.toString();
+
+  /**
+   * Add elements to pipeline.
+   * 
+   * @param elements
+   */
+  void add(Element... elements);
+
+  /**
+   * Add elements to pipeline and link them in given order.
+   * 
+   * @param elements
+   */
+  void addAndLink(Element... elements);
+
+  /**
+   * Link elements in given order using SyncLink. Element name can have prefix
+   * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
+   * "stdin" and "stdout". I.e. <code>link("foo", "bar", "baz");</code> is equal
+   * to <code>link("foo >stdin", "stdout< bar >stdin", "stdout< baz");</code> .
+   * 
+   * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
+   * so when pipeline will be connected with other elements, outside of this
+   * pipeline, they will be connected to IN and OUT elements.
+   * 
+   * Example:
+   * 
+   * <pre>
+   * pipeline.link(&quot;IN&quot;, &quot;foo&quot;, &quot;bar&quot;, &quot;OUT&quot;);
+   * // Make additional branch from foo to baz, and then to OUT
+   * pipeline.link(&quot;foo &gt;baz_out&quot;, &quot;baz&quot;, &quot;baz_in&lt; OUT&quot;);
+   * </pre>
+   * 
+   * @param elements
+   *          elements to link
+   */
+  void link(String... elements);
+
+  /**
+   * Get element by name.
+   * 
+   * @return an element
+   */
+  Element get(String elementName);
+
+  /**
+   * Get link by element name and pad name.
+   */
+  Link getLink(String elementName, String padName);
+
+  /**
+   * Set link by element name and pad name. Allows to link external elements
+   * into internal elements of pipeline. Special elements "IN" and "OUT" are
+   * pointing to pipeline outer interfaces.
+   */
+  void setLink(String elementName, String padName, Link link, Direction direction);
+
+  /**
+   * Get link connected to given pad in given element and run it main loop.
+   * @param separateThread
+   *          set to true to start main loop in separate thread.
+   * @param waitForStartEvent TODO
+   */
+  void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
new file mode 100644
index 0000000..abf132f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PipelineImpl implements Pipeline {
+
+  protected String id;
+  protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
+
+  public PipelineImpl(String id) {
+    this.id = id;
+    elements = initElementMap(id);
+  }
+
+  protected Map<String, Element> elements;
+
+  protected HashMap<String, Element> initElementMap(String id) {
+    HashMap<String, Element> map = new HashMap<String, Element>();
+
+    map.put(IN, new BaseElement(id + "." + IN));
+    map.put(OUT, new BaseElement(id + "." + OUT));
+    return map;
+  }
+
+  @Override
+  public Link getLink(String padName) {
+    Link link = elements.get(IN).getLink(padName);
+    if (link == null)
+      link = elements.get(OUT).getLink(padName);
+    return link;
+  }
+
+  @Override
+  public Set<String> getPads(Direction direction) {
+    switch (direction) {
+    case IN:
+      return elements.get(IN).getPads(direction);
+
+    case OUT:
+      return elements.get(OUT).getPads(direction);
+    }
+    return null;
+  }
+
+  @Override
+  public void validate() {
+    for (Element element : elements.values())
+      element.validate();
+
+    // Check IN element
+    {
+      Element element = get(IN);
+      int outPadsNumber = element.getPads(Direction.OUT).size();
+      int inPadsNumber = element.getPads(Direction.IN).size();
+      if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+        throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+            + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+    }
+
+    // Check OUT element
+    {
+      Element element = get(OUT);
+      int outPadsNumber = element.getPads(Direction.OUT).size();
+      int inPadsNumber = element.getPads(Direction.IN).size();
+      if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+        throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+            + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+    }
+
+  }
+
+  @Override
+  public void dropLink(String padName) {
+    if (elements.get(IN).getLink(padName) != null)
+      elements.get(IN).dropLink(padName);
+
+    if (elements.get(OUT).getLink(padName) != null)
+      elements.get(OUT).dropLink(padName);
+  }
+
+  @Override
+  public void dropLink(Link link) {
+    elements.get(IN).dropLink(link);
+    elements.get(OUT).dropLink(link);
+  }
+
+  @Override
+  public void replaceLink(Link existingLink, Link newLink) {
+    elements.get(IN).replaceLink(existingLink, newLink);
+    elements.get(OUT).replaceLink(existingLink, newLink);
+  }
+
+  @Override
+  public void setLink(String padName, Link link, Direction direction) {
+    // Wire links to internal elements instead
+    elements.get(direction.toString()).setLink(padName, link, direction);
+  }
+
+  @Override
+  public void poll(boolean block) {
+    throw new RuntimeException("Not implemented.");
+  }
+
+  @Override
+  public void handleData(ByteBuffer buf, Link link) {
+    get(IN).handleData(buf, link);
+  }
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    switch (direction) {
+    case IN:
+      get(IN).handleEvent(event, direction);
+      break;
+    case OUT:
+      get(OUT).handleEvent(event, direction);
+      break;
+    }
+  }
+
+  @Override
+  public void add(Element... elements) {
+    for (Element element : elements) {
+      String id = element.getId();
+
+      if (this.elements.containsKey(id))
+        throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+            + this.elements.get(id) + ".");
+
+      this.elements.put(id, element);
+    }
+  }
+  
+  @Override
+  public void link(String... elementNames) {
+
+    if (elementNames.length < 2)
+      throw new RuntimeException("At least two elements are necessary to create link between them.");
+
+    // Parse array of element and pad names
+
+    Element elements[] = new Element[elementNames.length];
+    String inputPads[] = new String[elementNames.length];
+    String outputPads[] = new String[elementNames.length];
+
+    int i = 0;
+    for (String elementName : elementNames) {
+      if (elementName.contains("< ")) {
+        inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
+        elementName = elementName.substring(elementName.indexOf("< ") + 2);
+      } else {
+        inputPads[i] = STDIN;
+      }
+
+      if (elementName.contains(" >")) {
+        outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
+        elementName = elementName.substring(0, elementName.indexOf(" >"));
+      } else {
+        outputPads[i] = STDOUT;
+      }
+
+      elements[i] = get(elementName);
+
+      if (elements[i] == null)
+        throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+            + this + ".");
+
+      i++;
+    }
+
+    // Link elements
+    for (i = 0; i < elements.length - 1; i++) {
+      Element leftElement = elements[i];
+      Element rightElement = elements[i + 1];
+      String leftPad = outputPads[i];
+      String rightPad = inputPads[i + 1];
+
+      String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+
+      if (verbose)
+        System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+
+      Link link = new SyncLink(linkId);
+      leftElement.setLink(leftPad, link, Direction.OUT);
+      rightElement.setLink(rightPad, link, Direction.IN);
+    }
+  }
+
+  @Override
+  public void addAndLink(Element... elements) {
+    add(elements);
+    link(elements);
+  }
+
+  private void link(Element... elements) {
+    String elementNames[] = new String[elements.length];
+
+    int i = 0;
+    for (Element element : elements) {
+      elementNames[i++] = element.getId();
+    }
+
+    link(elementNames);
+  }
+
+  @Override
+  public Element get(String elementName) {
+    return elements.get(elementName);
+  }
+
+  @Override
+  public Link getLink(String elementName, String padName) {
+    return elements.get(elementName).getLink(padName);
+
+  }
+
+  @Override
+  public void setLink(String elementName, String padName, Link link, Direction direction) {
+    elements.get(elementName).setLink(padName, link, direction);
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
+    validate();
+
+    Link link = getLink(elementName, padName);
+
+    if (link == null)
+      throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
+
+    if (!waitForStartEvent)
+      link.sendEvent(Event.STREAM_START, Direction.OUT);
+
+    if (separateThread) {
+      Thread thread = new Thread(link);
+      thread.setDaemon(true);
+      thread.start();
+    } else {
+      link.run();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Pipeline(" + id + ")";
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+    // System.setProperty("streamer.Link.debug", "true");
+    // System.setProperty("streamer.Element.debug", "true");
+    // System.setProperty("streamer.Pipeline.debug", "true");
+
+    Pipeline pipeline = new PipelineImpl("main");
+
+    // Create elements
+    pipeline.add(new FakeSource("source") {
+      {
+        this.incommingBufLength = 3;
+        this.numBuffers = 10;
+        this.delay = 100;
+      }
+    });
+    pipeline.add(new BaseElement("tee"));
+    pipeline.add(new FakeSink("sink") {
+      {
+        this.verbose = true;
+      }
+    });
+    pipeline.add(new FakeSink("sink2") {
+      {
+        this.verbose = true;
+      }
+    });
+
+    // Link elements
+    pipeline.link("source", "tee", "sink");
+    pipeline.link("tee >out2", "sink2");
+
+    // Run main loop
+    pipeline.runMainLoop("source", STDOUT, false, false);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a98c473d/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
new file mode 100644
index 0000000..7a17340
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Message queue for safe transfer of packets between threads.
+ */
+public class Queue extends BaseElement {
+
+  protected LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+
+  public Queue(String id) {
+    super(id);
+  }
+
+  @SuppressWarnings("incomplete-switch")
+  @Override
+  public void poll(boolean block) {
+    try {
+      ByteBuffer buf = null;
+      if (block) {
+        buf = queue.take();
+      } else {
+        buf = queue.poll(100, TimeUnit.MILLISECONDS);
+      }
+
+      if (buf != null)
+        pushDataToAllOuts(buf);
+
+    } catch (Exception e) {
+      sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+      closeQueue();
+    }
+  }
+
+  @Override
+  public void handleData(ByteBuffer buf, Link link) {
+    if (verbose)
+      System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+    // Put incoming data into queue
+    try {
+      queue.put(buf);
+    } catch (Exception e) {
+      sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+      closeQueue();
+    }
+  }
+
+  @Override
+  public void handleEvent(Event event, Direction direction) {
+    switch (event) {
+    case LINK_SWITCH_TO_PULL_MODE:
+      // Do not propagate this event, because this element is boundary between
+      // threads
+      break;
+    default:
+      super.handleEvent(event, direction);
+    }
+  }
+
+  @Override
+  protected void onClose() {
+    super.onClose();
+    closeQueue();
+  }
+
+  private void closeQueue() {
+    queue.clear();
+    queue.add(null);
+    // Drop queue to indicate that upstream is closed.
+    // May produce NPE in poll().
+    queue = null;
+  }
+
+  @Override
+  public String toString() {
+    return "Queue(" + id + ")";
+  }
+
+  /**
+   * Example.
+   */
+  public static void main(String args[]) {
+    // System.setProperty("streamer.Link.debug", "true");
+    System.setProperty("streamer.Element.debug", "true");
+
+    Element source1 = new FakeSource("source1") {
+      {
+        this.delay = 100;
+        this.numBuffers = 10;
+        this.incommingBufLength = 10;
+      }
+    };
+
+    Element source2 = new FakeSource("source2") {
+      {
+        this.delay = 100;
+        this.numBuffers = 10;
+        this.incommingBufLength = 10;
+      }
+    };
+
+    Pipeline pipeline = new PipelineImpl("test");
+    pipeline.add(source1);
+    pipeline.add(source2);
+    pipeline.add(new Queue("queue"));
+    pipeline.add(new FakeSink("sink"));
+
+    // Main flow
+    pipeline.link("source1", "in1< queue");
+    pipeline.link("source2", "in2< queue");
+    pipeline.link("queue", "sink");
+
+    new Thread(pipeline.getLink("source1", STDOUT)).start();
+    new Thread(pipeline.getLink("source2", STDOUT)).start();
+    pipeline.getLink("sink", STDIN).run();
+  }
+}