You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by de...@apache.org on 2013/12/23 10:12:59 UTC

[05/22] CLOUDSTACK-5344: Update to allow rdp console to access hyper-v vm virtual framebuffer.

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
deleted file mode 100644
index ce9fdf9..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-import java.util.Arrays;
-
-/**
- * Compare incoming packets with expected packets.
- */
-public class MockSink extends BaseElement {
-
-  protected ByteBuffer bufs[] = null;
-
-  public MockSink(String id) {
-    super(id);
-  }
-
-  public MockSink(String id, ByteBuffer bufs[]) {
-    super(id);
-    this.bufs = bufs;
-  }
-
-  @Override
-  public void handleData(ByteBuffer buf, Link link) {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
-
-    if (buf == null)
-      return;
-
-    if (packetNumber >= bufs.length)
-      throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
-          + ", unexpected buffer: " + buf + ".");
-
-    // Compare incoming buffer with expected buffer
-    if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
-      throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n  Actual bufer: " + buf
-          + ",\n  expected buffer: " + bufs[packetNumber] + ".");
-
-    if (verbose)
-      System.out.println("[" + this + "] INFO: buffers are equal.");
-
-    // Use packetNumber variable to count incoming packets
-    packetNumber++;
-
-    buf.unref();
-  }
-
-  @Override
-  protected void onClose() {
-    super.onClose();
-
-    if (packetNumber != bufs.length)
-      throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
-  }
-
-  @Override
-  public String toString() {
-    return "MockSink(" + id + ")";
-  }
-
-  /**
-   * Example.
-   */
-  public static void main(String args[]) {
-
-    Element mockSource = new MockSource("source") {
-      {
-        this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
-            new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
-        this.verbose = true;
-        this.delay = 100;
-        this.numBuffers = this.bufs.length;
-      }
-    };
-
-    Element mockSink = new MockSink("sink") {
-      {
-        this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
-            new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
-        this.verbose = true;
-      }
-    };
-
-    Link link = new SyncLink() {
-      {
-        this.verbose = true;
-      }
-    };
-
-    mockSource.setLink(STDOUT, link, Direction.OUT);
-    mockSink.setLink(STDIN, link, Direction.IN);
-
-    link.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
deleted file mode 100644
index db47db2..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class MockSource extends FakeSource {
-
-  protected ByteBuffer bufs[] = null;
-
-  public MockSource(String id) {
-    super(id);
-  }
-
-  public MockSource(String id, ByteBuffer bufs[]) {
-    super(id);
-    this.bufs = bufs;
-  }
-
-  /**
-   * Initialize data.
-   */
-  @Override
-  public ByteBuffer initializeData() {
-    if (packetNumber >= bufs.length) {
-      sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
-      return null;
-    }
-
-    ByteBuffer buf = bufs[packetNumber];
-
-    buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
-    return buf;
-  }
-
-  @Override
-  public void handleEvent(Event event, Direction direction) {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Event received: " + event + ".");
-
-  }
-
-  @Override
-  public String toString() {
-    return "MockSource(" + id + ")";
-  }
-
-  /**
-   * Example.
-   */
-  public static void main(String args[]) {
-
-    Element mockSource = new MockSource("source") {
-      {
-        this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
-            new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
-        this.verbose = true;
-        this.delay = 100;
-        // this.numBuffers = this.bufs.length;
-      }
-    };
-
-    Element fakeSink = new FakeSink("sink") {
-      {
-        this.verbose = true;
-      }
-    };
-
-    Link link = new SyncLink();
-
-    mockSource.setLink(STDOUT, link, Direction.OUT);
-    fakeSink.setLink(STDIN, link, Direction.IN);
-
-    link.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
old mode 100644
new mode 100755
index a7d4848..2e5d891
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -29,105 +29,108 @@ package streamer;
  */
 public abstract class OneTimeSwitch extends BaseElement {
 
-  /**
-   * One-time out - name of output pad for one time logic. By default, output
-   * directly to socket.
-   */
-  public static final String OTOUT = "otout";
-
-  private boolean switched = false;
-
-  public OneTimeSwitch(String id) {
-    super(id);
-    declarePads();
-  }
-
-  protected void declarePads() {
-    inputPads.put(STDIN, null);
-    outputPads.put(OTOUT, null);
-    outputPads.put(STDOUT, null);
-  }
-
-  @Override
-  public void handleData(ByteBuffer buf, Link link) {
-    if (switched)
-      throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
-
-    if (buf == null)
-      return;
-
-    handleOneTimeData(buf, link);
-  }
-
-  public void pushDataToOTOut(ByteBuffer buf) {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Sending data:  " + buf + ".");
-
-    outputPads.get(OTOUT).sendData(buf);
-  }
-
-  /**
-   * Switch this element off. Pass data directly to main output(s).
-   */
-  public void switchOff() {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Switching OFF.");
-
-    switched = true;
-    verbose = false;
-
-    // Rewire links: drop otout link, replace stdout link by stdin to send data
-    // directly to stdout
-    Link stdout = (Link) outputPads.get(STDOUT);
-    Link stdin = (Link) inputPads.get(STDIN);
-    Link otout = (Link) outputPads.get(OTOUT);
-
-    otout.drop();
-
-    // Wake up next peer(s)
-    sendEventToAllPads(Event.STREAM_START, Direction.OUT);
-
-    stdin.setSink(null);
-    inputPads.remove(STDIN);
-
-    Element nextPeer = stdout.getSink();
-    nextPeer.replaceLink(stdout, stdin);
-    stdout.drop();
-
-    for (Object link : inputPads.values().toArray())
-      ((Link) link).drop();
-    for (Object link : outputPads.values().toArray())
-      ((Link) link).drop();
-
-  }
-
-  public void switchOn() {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Switching ON.");
-
-    switched = false;
-  }
-
-  /**
-   * Override this method to handle one-time packet(s) at handshake or
-   * initialization stages. Execute method @see switchRoute() when this method
-   * is no longer necessary.
-   */
-  protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
-
-  @Override
-  public void handleEvent(Event event, Direction direction) {
-    if (event == Event.STREAM_START) {
-      if (verbose)
-        System.out.println("[" + this + "] INFO: Event " + event + " is received.");
-
-      switchOn();
-
-      // Execute this element onStart(), but do not propagate event further,
-      // to not wake up next elements too early
-      onStart();
-    } else
-      super.handleEvent(event, direction);
-  }
+    /**
+     * One-time out - name of output pad for one time logic. By default, output
+     * directly to socket.
+     */
+    public static final String OTOUT = "otout";
+
+    private boolean switched = false;
+
+    public OneTimeSwitch(String id) {
+        super(id);
+        declarePads();
+    }
+
+    protected void declarePads() {
+        inputPads.put(STDIN, null);
+        outputPads.put(OTOUT, null);
+        outputPads.put(STDOUT, null);
+    }
+
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (switched)
+            throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
+
+        if (buf == null)
+            return;
+
+        handleOneTimeData(buf, link);
+    }
+
+    public void pushDataToOTOut(ByteBuffer buf) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Sending data:  " + buf + ".");
+
+        outputPads.get(OTOUT).sendData(buf);
+    }
+
+    /**
+     * Switch this element off. Pass data directly to main output(s).
+     */
+    public void switchOff() {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Switching OFF.");
+
+        switched = true;
+        verbose = false;
+
+        // Rewire links: drop otout link, replace stdout link by stdin to send data
+        // directly to stdout
+        Link stdout = (Link)outputPads.get(STDOUT);
+        Link stdin = (Link)inputPads.get(STDIN);
+        Link otout = (Link)outputPads.get(OTOUT);
+
+        otout.drop();
+
+        // Wake up next peer(s)
+        sendEventToAllPads(Event.STREAM_START, Direction.OUT);
+
+        // Disconnect our stdin from this element
+        stdin.setSink(null);
+        inputPads.remove(STDIN);
+
+        // Replace next peer stdin (our stdout) by our stdin
+        Element nextPeer = stdout.getSink();
+        nextPeer.replaceLink(stdout, stdin);
+        stdout.drop();
+
+        // Drop all other links
+        for (Object link : inputPads.values().toArray())
+            ((Link)link).drop();
+        for (Object link : outputPads.values().toArray())
+            ((Link)link).drop();
+
+    }
+
+    public void switchOn() {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Switching ON.");
+
+        switched = false;
+    }
+
+    /**
+     * Override this method to handle one-time packet(s) at handshake or
+     * initialization stages. Execute method @see switchRoute() when this method
+     * is no longer necessary.
+     */
+    protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        if (event == Event.STREAM_START) {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+            switchOn();
+
+            // Execute this element onStart(), but do not propagate event further,
+            // to not wake up next elements too early
+            onStart();
+        } else
+            super.handleEvent(event, direction);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
old mode 100644
new mode 100755
index 1d63850..8f5654e
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
@@ -18,6 +18,6 @@ package streamer;
 
 public class Order {
 
-  public Object type;
+    public Object type;
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
old mode 100644
new mode 100755
index d1aa5ce..104b8c3
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -20,134 +20,136 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import streamer.debug.FakeSource;
+
 public class OutputStreamSink extends BaseElement {
 
-  protected OutputStream os;
-  protected SocketWrapper socketWrapper;
-
-  public OutputStreamSink(String id) {
-    super(id);
-  }
-
-  public OutputStreamSink(String id, OutputStream os) {
-    super(id);
-    this.os = os;
-  }
-
-  public OutputStreamSink(String id, SocketWrapper socketWrapper) {
-    super(id);
-    this.socketWrapper = socketWrapper;
-  }
-
-  public void setOutputStream(OutputStream os) {
-    this.os = os;
-    // Resume links
-    resumeLinks();
-  }
-
-  /**
-   * Send incoming data to stream.
-   */
-  @Override
-  public void handleData(ByteBuffer buf, Link link) {
-    if (buf == null)
-      return;
-
-    try {
-      if (verbose)
-        System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
-
-      os.write(buf.data, buf.offset, buf.length);
-      os.flush();
-    } catch (IOException e) {
-      System.err.println("[" + this + "] ERROR: " + e.getMessage());
-      closeStream();
+    protected OutputStream os;
+    protected SocketWrapperImpl socketWrapper;
+
+    public OutputStreamSink(String id) {
+        super(id);
+    }
+
+    public OutputStreamSink(String id, OutputStream os) {
+        super(id);
+        this.os = os;
+    }
+
+    public OutputStreamSink(String id, SocketWrapperImpl socketWrapper) {
+        super(id);
+        this.socketWrapper = socketWrapper;
     }
-  }
-
-  @Override
-  public void handleEvent(Event event, Direction direction) {
-    switch (event) {
-    case SOCKET_UPGRADE_TO_SSL:
-      socketWrapper.upgradeToSsl();
-      break;
-    default:
-      super.handleEvent(event, direction);
+
+    public void setOutputStream(OutputStream os) {
+        this.os = os;
+        // Resume links
+        resumeLinks();
+    }
+
+    /**
+     * Send incoming data to stream.
+     */
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (buf == null)
+            return;
+
+        try {
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+            os.write(buf.data, buf.offset, buf.length);
+            os.flush();
+        } catch (IOException e) {
+            System.err.println("[" + this + "] ERROR: " + e.getMessage());
+            closeStream();
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            socketWrapper.upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
+        }
     }
-  }
-
-  @Override
-  public void setLink(String padName, Link link, Direction direction) {
-    switch (direction) {
-    case IN:
-      super.setLink(padName, link, direction);
-
-      if (os == null)
-        // Pause links until data stream will be ready
-        link.pause();
-      break;
-    case OUT:
-      throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+
+    @Override
+    public void setLink(String padName, Link link, Direction direction) {
+        switch (direction) {
+        case IN:
+            super.setLink(padName, link, direction);
+
+            if (os == null)
+                // Pause links until data stream will be ready
+                link.pause();
+            break;
+        case OUT:
+            throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+        }
     }
-  }
 
-  private void resumeLinks() {
-    for (DataSource source : inputPads.values())
-      ((Link) source).resume();
-  }
+    private void resumeLinks() {
+        for (DataSource source : inputPads.values())
+            ((Link)source).resume();
+    }
 
-  @Override
-  protected void onClose() {
-    closeStream();
-  }
+    @Override
+    protected void onClose() {
+        closeStream();
+    }
 
-  private void closeStream() {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Closing stream.");
+    private void closeStream() {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Closing stream.");
+
+        try {
+            os.close();
+        } catch (IOException e) {
+        }
+        try {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+    }
 
-    try {
-      os.close();
-    } catch (IOException e) {
+    @Override
+    public String toString() {
+        return "OutputStreamSink(" + id + ")";
     }
-    try {
-      sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
-    } catch (Exception e) {
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+        Element source = new FakeSource("source") {
+            {
+                this.verbose = true;
+                this.numBuffers = 3;
+                this.incommingBufLength = 5;
+                this.delay = 100;
+            }
+        };
+
+        OutputStreamSink sink = new OutputStreamSink("sink") {
+            {
+                verbose = true;
+            }
+        };
+
+        Link link = new SyncLink();
+
+        source.setLink(STDOUT, link, Direction.OUT);
+        sink.setLink(STDIN, link, Direction.IN);
+
+        sink.setOutputStream(new ByteArrayOutputStream());
+
+        link.sendEvent(Event.STREAM_START, Direction.IN);
+        link.run();
+
     }
-  }
-
-  @Override
-  public String toString() {
-    return "OutputStreamSink(" + id + ")";
-  }
-
-  /**
-   * Example.
-   */
-  public static void main(String args[]) {
-    Element source = new FakeSource("source") {
-      {
-        this.verbose = true;
-        this.numBuffers = 3;
-        this.incommingBufLength = 5;
-        this.delay = 100;
-      }
-    };
-
-    OutputStreamSink sink = new OutputStreamSink("sink") {
-      {
-        verbose = true;
-      }
-    };
-
-    Link link = new SyncLink();
-
-    source.setLink(STDOUT, link, Direction.OUT);
-    sink.setLink(STDIN, link, Direction.IN);
-
-    sink.setOutputStream(new ByteArrayOutputStream());
-
-    link.sendEvent(Event.STREAM_START, Direction.IN);
-    link.run();
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
old mode 100644
new mode 100755
index c369350..ad9d8d6
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -21,71 +21,73 @@ package streamer;
  */
 public interface Pipeline extends Element {
 
-  static final String IN = Direction.IN.toString();
-  static final String OUT = Direction.OUT.toString();
+    static final String IN = Direction.IN.toString();
+    static final String OUT = Direction.OUT.toString();
 
-  /**
-   * Add elements to pipeline.
-   * 
-   * @param elements
-   */
-  void add(Element... elements);
+    /**
+     * Add elements to pipeline.
+     * 
+     * @param elements
+     */
+    void add(Element... elements);
 
-  /**
-   * Add elements to pipeline and link them in given order.
-   * 
-   * @param elements
-   */
-  void addAndLink(Element... elements);
+    /**
+     * Add elements to pipeline and link them in given order.
+     * 
+     * @param elements
+     */
+    void addAndLink(Element... elements);
 
-  /**
-   * Link elements in given order using SyncLink. Element name can have prefix
-   * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
-   * "stdin" and "stdout". I.e. <code>link("foo", "bar", "baz");</code> is equal
-   * to <code>link("foo >stdin", "stdout< bar >stdin", "stdout< baz");</code> .
-   * 
-   * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
-   * so when pipeline will be connected with other elements, outside of this
-   * pipeline, they will be connected to IN and OUT elements.
-   * 
-   * Example:
-   * 
-   * <pre>
-   * pipeline.link(&quot;IN&quot;, &quot;foo&quot;, &quot;bar&quot;, &quot;OUT&quot;);
-   * // Make additional branch from foo to baz, and then to OUT
-   * pipeline.link(&quot;foo &gt;baz_out&quot;, &quot;baz&quot;, &quot;baz_in&lt; OUT&quot;);
-   * </pre>
-   * 
-   * @param elements
-   *          elements to link
-   */
-  void link(String... elements);
+    /**
+     * Link elements in given order using SyncLink. Element name can have prefix
+     * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
+     * "stdin" and "stdout". I.e. <code>link("foo", "bar", "baz");</code> is equal
+     * to <code>link("foo >stdin", "stdout< bar >stdin", "stdout< baz");</code> .
+     * 
+     * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
+     * so when pipeline will be connected with other elements, outside of this
+     * pipeline, they will be connected to IN and OUT elements.
+     * 
+     * Empty names are skipped.
+     * 
+     * Example:
+     * 
+     * <pre>
+     * pipeline.link(&quot;IN&quot;, &quot;foo&quot;, &quot;bar&quot;, &quot;OUT&quot;);
+     * // Make additional branch from foo to baz, and then to OUT
+     * pipeline.link(&quot;foo &gt;baz_out&quot;, &quot;baz&quot;, &quot;baz_in&lt; OUT&quot;);
+     * </pre>
+     * 
+     * @param elements
+     *          elements to link
+     */
+    void link(String... elements);
 
-  /**
-   * Get element by name.
-   * 
-   * @return an element
-   */
-  Element get(String elementName);
+    /**
+     * Get element by name.
+     * 
+     * @return an element
+     */
+    Element get(String elementName);
 
-  /**
-   * Get link by element name and pad name.
-   */
-  Link getLink(String elementName, String padName);
+    /**
+     * Get link by element name and pad name.
+     */
+    Link getLink(String elementName, String padName);
 
-  /**
-   * Set link by element name and pad name. Allows to link external elements
-   * into internal elements of pipeline. Special elements "IN" and "OUT" are
-   * pointing to pipeline outer interfaces.
-   */
-  void setLink(String elementName, String padName, Link link, Direction direction);
+    /**
+     * Set link by element name and pad name. Allows to link external elements
+     * into internal elements of pipeline. Special elements "IN" and "OUT" are
+     * pointing to pipeline outer interfaces.
+     */
+    void setLink(String elementName, String padName, Link link, Direction direction);
 
-  /**
-   * Get link connected to given pad in given element and run it main loop.
-   * @param separateThread
-   *          set to true to start main loop in separate thread.
-   * @param waitForStartEvent TODO
-   */
-  void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
+    /**
+     * Get link connected to given pad in given element and run it main loop.
+     * @param separateThread
+     *          set to true to start main loop in separate thread.
+     * @param waitForStartEvent TODO
+     */
+    void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
old mode 100644
new mode 100755
index abf132f..299d0a4
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -16,294 +16,325 @@
 // under the License.
 package streamer;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
 public class PipelineImpl implements Pipeline {
 
-  protected String id;
-  protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
+    protected String id;
+    protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
 
-  public PipelineImpl(String id) {
-    this.id = id;
-    elements = initElementMap(id);
-  }
+    public PipelineImpl(String id) {
+        this.id = id;
+        elements = initElementMap(id);
+    }
 
-  protected Map<String, Element> elements;
+    protected Map<String, Element> elements;
 
-  protected HashMap<String, Element> initElementMap(String id) {
-    HashMap<String, Element> map = new HashMap<String, Element>();
+    protected HashMap<String, Element> initElementMap(String id) {
+        HashMap<String, Element> map = new HashMap<String, Element>();
 
-    map.put(IN, new BaseElement(id + "." + IN));
-    map.put(OUT, new BaseElement(id + "." + OUT));
-    return map;
-  }
+        map.put(IN, new BaseElement(id + "." + IN));
+        map.put(OUT, new BaseElement(id + "." + OUT));
+        return map;
+    }
 
-  @Override
-  public Link getLink(String padName) {
-    Link link = elements.get(IN).getLink(padName);
-    if (link == null)
-      link = elements.get(OUT).getLink(padName);
-    return link;
-  }
+    @Override
+    public Link getLink(String padName) {
+        Link link = elements.get(IN).getLink(padName);
+        if (link == null)
+            link = elements.get(OUT).getLink(padName);
+        return link;
+    }
 
-  @Override
-  public Set<String> getPads(Direction direction) {
-    switch (direction) {
-    case IN:
-      return elements.get(IN).getPads(direction);
+    @Override
+    public Set<String> getPads(Direction direction) {
+        switch (direction) {
+        case IN:
+            return elements.get(IN).getPads(direction);
 
-    case OUT:
-      return elements.get(OUT).getPads(direction);
+        case OUT:
+            return elements.get(OUT).getPads(direction);
+        }
+        return null;
     }
-    return null;
-  }
-
-  @Override
-  public void validate() {
-    for (Element element : elements.values())
-      element.validate();
-
-    // Check IN element
-    {
-      Element element = get(IN);
-      int outPadsNumber = element.getPads(Direction.OUT).size();
-      int inPadsNumber = element.getPads(Direction.IN).size();
-      if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
-        throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
-            + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+
+    @Override
+    public void validate() {
+        for (Element element : elements.values())
+            element.validate();
+
+        // Check IN element
+        {
+            Element element = get(IN);
+            int outPadsNumber = element.getPads(Direction.OUT).size();
+            int inPadsNumber = element.getPads(Direction.IN).size();
+            if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+                throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+                        + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+        }
+
+        // Check OUT element
+        {
+            Element element = get(OUT);
+            int outPadsNumber = element.getPads(Direction.OUT).size();
+            int inPadsNumber = element.getPads(Direction.IN).size();
+            if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+                throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+                        + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+        }
+
     }
 
-    // Check OUT element
-    {
-      Element element = get(OUT);
-      int outPadsNumber = element.getPads(Direction.OUT).size();
-      int inPadsNumber = element.getPads(Direction.IN).size();
-      if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
-        throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
-            + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+    @Override
+    public void dropLink(String padName) {
+        if (elements.get(IN).getLink(padName) != null)
+            elements.get(IN).dropLink(padName);
+
+        if (elements.get(OUT).getLink(padName) != null)
+            elements.get(OUT).dropLink(padName);
     }
 
-  }
-
-  @Override
-  public void dropLink(String padName) {
-    if (elements.get(IN).getLink(padName) != null)
-      elements.get(IN).dropLink(padName);
-
-    if (elements.get(OUT).getLink(padName) != null)
-      elements.get(OUT).dropLink(padName);
-  }
-
-  @Override
-  public void dropLink(Link link) {
-    elements.get(IN).dropLink(link);
-    elements.get(OUT).dropLink(link);
-  }
-
-  @Override
-  public void replaceLink(Link existingLink, Link newLink) {
-    elements.get(IN).replaceLink(existingLink, newLink);
-    elements.get(OUT).replaceLink(existingLink, newLink);
-  }
-
-  @Override
-  public void setLink(String padName, Link link, Direction direction) {
-    // Wire links to internal elements instead
-    elements.get(direction.toString()).setLink(padName, link, direction);
-  }
-
-  @Override
-  public void poll(boolean block) {
-    throw new RuntimeException("Not implemented.");
-  }
-
-  @Override
-  public void handleData(ByteBuffer buf, Link link) {
-    get(IN).handleData(buf, link);
-  }
-
-  @Override
-  public void handleEvent(Event event, Direction direction) {
-    switch (direction) {
-    case IN:
-      get(IN).handleEvent(event, direction);
-      break;
-    case OUT:
-      get(OUT).handleEvent(event, direction);
-      break;
+    @Override
+    public void dropLink(Link link) {
+        elements.get(IN).dropLink(link);
+        elements.get(OUT).dropLink(link);
     }
-  }
 
-  @Override
-  public void add(Element... elements) {
-    for (Element element : elements) {
-      String id = element.getId();
+    @Override
+    public void replaceLink(Link existingLink, Link newLink) {
+        elements.get(IN).replaceLink(existingLink, newLink);
+        elements.get(OUT).replaceLink(existingLink, newLink);
+    }
 
-      if (this.elements.containsKey(id))
-        throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
-            + this.elements.get(id) + ".");
+    @Override
+    public void setLink(String padName, Link link, Direction direction) {
+        // Wire links to internal elements instead
+        elements.get(direction.toString()).setLink(padName, link, direction);
+    }
 
-      this.elements.put(id, element);
+    @Override
+    public void poll(boolean block) {
+        throw new RuntimeException("Not implemented.");
     }
-  }
-  
-  @Override
-  public void link(String... elementNames) {
-
-    if (elementNames.length < 2)
-      throw new RuntimeException("At least two elements are necessary to create link between them.");
-
-    // Parse array of element and pad names
-
-    Element elements[] = new Element[elementNames.length];
-    String inputPads[] = new String[elementNames.length];
-    String outputPads[] = new String[elementNames.length];
-
-    int i = 0;
-    for (String elementName : elementNames) {
-      if (elementName.contains("< ")) {
-        inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
-        elementName = elementName.substring(elementName.indexOf("< ") + 2);
-      } else {
-        inputPads[i] = STDIN;
-      }
-
-      if (elementName.contains(" >")) {
-        outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
-        elementName = elementName.substring(0, elementName.indexOf(" >"));
-      } else {
-        outputPads[i] = STDOUT;
-      }
-
-      elements[i] = get(elementName);
-
-      if (elements[i] == null)
-        throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
-            + this + ".");
-
-      i++;
+
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        get(IN).handleData(buf, link);
     }
 
-    // Link elements
-    for (i = 0; i < elements.length - 1; i++) {
-      Element leftElement = elements[i];
-      Element rightElement = elements[i + 1];
-      String leftPad = outputPads[i];
-      String rightPad = inputPads[i + 1];
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (direction) {
+        case IN:
+            get(IN).handleEvent(event, direction);
+            break;
+        case OUT:
+            get(OUT).handleEvent(event, direction);
+            break;
+        }
+    }
 
-      String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+    @Override
+    public void add(Element... elements) {
+        for (Element element : elements) {
+            String id = element.getId();
 
-      if (verbose)
-        System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+            if (this.elements.containsKey(id))
+                throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+                        + this.elements.get(id) + ".");
 
-      Link link = new SyncLink(linkId);
-      leftElement.setLink(leftPad, link, Direction.OUT);
-      rightElement.setLink(rightPad, link, Direction.IN);
+            this.elements.put(id, element);
+        }
     }
-  }
 
-  @Override
-  public void addAndLink(Element... elements) {
-    add(elements);
-    link(elements);
-  }
+    @Override
+    public void link(String... elementNames) {
+
+        elementNames = filterOutEmptyStrings(elementNames);
+
+        if (elementNames.length < 2)
+            throw new RuntimeException("At least two elements are necessary to create link between them.");
+
+        // Parse array of element and pad names
+
+        Element elements[] = new Element[elementNames.length];
+        String inputPads[] = new String[elementNames.length];
+        String outputPads[] = new String[elementNames.length];
+
+        int i = 0;
+        for (String elementName : elementNames) {
+            if (elementName.contains("< ")) {
+                inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
+                elementName = elementName.substring(elementName.indexOf("< ") + 2);
+            } else {
+                inputPads[i] = STDIN;
+            }
+
+            if (elementName.contains(" >")) {
+                outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
+                elementName = elementName.substring(0, elementName.indexOf(" >"));
+            } else {
+                outputPads[i] = STDOUT;
+            }
+
+            elements[i] = get(elementName);
+
+            if (elements[i] == null)
+                throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+                        + this + ".");
 
-  private void link(Element... elements) {
-    String elementNames[] = new String[elements.length];
+            i++;
+        }
 
-    int i = 0;
-    for (Element element : elements) {
-      elementNames[i++] = element.getId();
+        // Link elements
+        for (i = 0; i < elements.length - 1; i++) {
+            Element leftElement = elements[i];
+            Element rightElement = elements[i + 1];
+            String leftPad = outputPads[i];
+            String rightPad = inputPads[i + 1];
+
+            String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+
+            if (verbose)
+                System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+
+            Link link = new SyncLink(linkId);
+            leftElement.setLink(leftPad, link, Direction.OUT);
+            rightElement.setLink(rightPad, link, Direction.IN);
+        }
+    }
+
+    /**
+     * Filter out empty strings from array and return new array with non-empty
+     * elements only. If array contains no empty string, returns same array.
+     */
+    private String[] filterOutEmptyStrings(String[] strings) {
+
+        boolean found = false;
+        for (String string : strings) {
+            if (string == null || string.isEmpty()) {
+                found = true;
+                break;
+            }
+        }
+
+        if (!found)
+            return strings;
+
+        List<String> filteredStrings = new ArrayList<String>(strings.length);
+        for (String string : strings)
+            if (string != null && !string.isEmpty())
+                filteredStrings.add(string);
+        return filteredStrings.toArray(new String[filteredStrings.size()]);
+    }
+
+    @Override
+    public void addAndLink(Element... elements) {
+        add(elements);
+        link(elements);
+    }
+
+    private void link(Element... elements) {
+        String elementNames[] = new String[elements.length];
+
+        int i = 0;
+        for (Element element : elements) {
+            elementNames[i++] = element.getId();
+        }
+
+        link(elementNames);
     }
 
-    link(elementNames);
-  }
+    @Override
+    public Element get(String elementName) {
+        return elements.get(elementName);
+    }
 
-  @Override
-  public Element get(String elementName) {
-    return elements.get(elementName);
-  }
+    @Override
+    public Link getLink(String elementName, String padName) {
+        return elements.get(elementName).getLink(padName);
 
-  @Override
-  public Link getLink(String elementName, String padName) {
-    return elements.get(elementName).getLink(padName);
+    }
 
-  }
+    @Override
+    public void setLink(String elementName, String padName, Link link, Direction direction) {
+        elements.get(elementName).setLink(padName, link, direction);
+    }
 
-  @Override
-  public void setLink(String elementName, String padName, Link link, Direction direction) {
-    elements.get(elementName).setLink(padName, link, direction);
-  }
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
+        validate();
 
-  @Override
-  public String getId() {
-    return id;
-  }
+        Link link = getLink(elementName, padName);
 
-  @Override
-  public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
-    validate();
+        if (link == null)
+            throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
 
-    Link link = getLink(elementName, padName);
+        if (!waitForStartEvent)
+            link.sendEvent(Event.STREAM_START, Direction.OUT);
 
-    if (link == null)
-      throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
+        if (separateThread) {
+            Thread thread = new Thread(link);
+            thread.setDaemon(true);
+            thread.start();
+        } else {
+            link.run();
+        }
+    }
 
-    if (!waitForStartEvent)
-      link.sendEvent(Event.STREAM_START, Direction.OUT);
+    @Override
+    public String toString() {
+        return "Pipeline(" + id + ")";
+    }
 
-    if (separateThread) {
-      Thread thread = new Thread(link);
-      thread.setDaemon(true);
-      thread.start();
-    } else {
-      link.run();
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+        // System.setProperty("streamer.Link.debug", "true");
+        // System.setProperty("streamer.Element.debug", "true");
+        // System.setProperty("streamer.Pipeline.debug", "true");
+
+        Pipeline pipeline = new PipelineImpl("main");
+
+        // Create elements
+        pipeline.add(new FakeSource("source") {
+            {
+                incommingBufLength = 3;
+                numBuffers = 10;
+                delay = 100;
+            }
+        });
+        pipeline.add(new BaseElement("tee"));
+        pipeline.add(new FakeSink("sink") {
+            {
+                verbose = true;
+            }
+        });
+        pipeline.add(new FakeSink("sink2") {
+            {
+                verbose = true;
+            }
+        });
+
+        // Link elements
+        pipeline.link("source", "tee", "sink");
+        pipeline.link("tee >out2", "sink2");
+
+        // Run main loop
+        pipeline.runMainLoop("source", STDOUT, false, false);
     }
-  }
-
-  @Override
-  public String toString() {
-    return "Pipeline(" + id + ")";
-  }
-
-  /**
-   * Example.
-   */
-  public static void main(String args[]) {
-    // System.setProperty("streamer.Link.debug", "true");
-    // System.setProperty("streamer.Element.debug", "true");
-    // System.setProperty("streamer.Pipeline.debug", "true");
-
-    Pipeline pipeline = new PipelineImpl("main");
-
-    // Create elements
-    pipeline.add(new FakeSource("source") {
-      {
-        this.incommingBufLength = 3;
-        this.numBuffers = 10;
-        this.delay = 100;
-      }
-    });
-    pipeline.add(new BaseElement("tee"));
-    pipeline.add(new FakeSink("sink") {
-      {
-        this.verbose = true;
-      }
-    });
-    pipeline.add(new FakeSink("sink2") {
-      {
-        this.verbose = true;
-      }
-    });
-
-    // Link elements
-    pipeline.link("source", "tee", "sink");
-    pipeline.link("tee >out2", "sink2");
-
-    // Run main loop
-    pipeline.runMainLoop("source", STDOUT, false, false);
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
old mode 100644
new mode 100755
index 7a17340..ed952ce
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -19,118 +19,120 @@ package streamer;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
 /**
  * Message queue for safe transfer of packets between threads.
  */
 public class Queue extends BaseElement {
 
-  protected LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
-
-  public Queue(String id) {
-    super(id);
-  }
-
-  @SuppressWarnings("incomplete-switch")
-  @Override
-  public void poll(boolean block) {
-    try {
-      ByteBuffer buf = null;
-      if (block) {
-        buf = queue.take();
-      } else {
-        buf = queue.poll(100, TimeUnit.MILLISECONDS);
-      }
-
-      if (buf != null)
-        pushDataToAllOuts(buf);
-
-    } catch (Exception e) {
-      sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
-      closeQueue();
+    protected LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+
+    public Queue(String id) {
+        super(id);
+    }
+
+    @Override
+    public void poll(boolean block) {
+        try {
+            ByteBuffer buf = null;
+            if (block) {
+                buf = queue.take();
+            } else {
+                buf = queue.poll(100, TimeUnit.MILLISECONDS);
+            }
+
+            if (buf != null)
+                pushDataToAllOuts(buf);
+
+        } catch (Exception e) {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+            closeQueue();
+        }
+    }
+
+    @Override
+    public void handleData(ByteBuffer buf, Link link) {
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+        // Put incoming data into queue
+        try {
+            queue.put(buf);
+        } catch (Exception e) {
+            sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+            closeQueue();
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case LINK_SWITCH_TO_PULL_MODE:
+            // Do not propagate this event, because this element is boundary between
+            // threads
+            break;
+        default:
+            super.handleEvent(event, direction);
+        }
+    }
+
+    @Override
+    protected void onClose() {
+        super.onClose();
+        closeQueue();
     }
-  }
-
-  @Override
-  public void handleData(ByteBuffer buf, Link link) {
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
-
-    // Put incoming data into queue
-    try {
-      queue.put(buf);
-    } catch (Exception e) {
-      sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
-      closeQueue();
+
+    private void closeQueue() {
+        queue.clear();
+        queue.add(null);
+        // Drop queue to indicate that upstream is closed.
+        // May produce NPE in poll().
+        queue = null;
     }
-  }
-
-  @Override
-  public void handleEvent(Event event, Direction direction) {
-    switch (event) {
-    case LINK_SWITCH_TO_PULL_MODE:
-      // Do not propagate this event, because this element is boundary between
-      // threads
-      break;
-    default:
-      super.handleEvent(event, direction);
+
+    @Override
+    public String toString() {
+        return "Queue(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+        // System.setProperty("streamer.Link.debug", "true");
+        System.setProperty("streamer.Element.debug", "true");
+
+        Element source1 = new FakeSource("source1") {
+            {
+                this.delay = 100;
+                this.numBuffers = 10;
+                this.incommingBufLength = 10;
+            }
+        };
+
+        Element source2 = new FakeSource("source2") {
+            {
+                this.delay = 100;
+                this.numBuffers = 10;
+                this.incommingBufLength = 10;
+            }
+        };
+
+        Pipeline pipeline = new PipelineImpl("test");
+        pipeline.add(source1);
+        pipeline.add(source2);
+        pipeline.add(new Queue("queue"));
+        pipeline.add(new FakeSink("sink"));
+
+        // Main flow
+        pipeline.link("source1", "in1< queue");
+        pipeline.link("source2", "in2< queue");
+        pipeline.link("queue", "sink");
+
+        new Thread(pipeline.getLink("source1", STDOUT)).start();
+        new Thread(pipeline.getLink("source2", STDOUT)).start();
+        pipeline.getLink("sink", STDIN).run();
     }
-  }
-
-  @Override
-  protected void onClose() {
-    super.onClose();
-    closeQueue();
-  }
-
-  private void closeQueue() {
-    queue.clear();
-    queue.add(null);
-    // Drop queue to indicate that upstream is closed.
-    // May produce NPE in poll().
-    queue = null;
-  }
-
-  @Override
-  public String toString() {
-    return "Queue(" + id + ")";
-  }
-
-  /**
-   * Example.
-   */
-  public static void main(String args[]) {
-    // System.setProperty("streamer.Link.debug", "true");
-    System.setProperty("streamer.Element.debug", "true");
-
-    Element source1 = new FakeSource("source1") {
-      {
-        this.delay = 100;
-        this.numBuffers = 10;
-        this.incommingBufLength = 10;
-      }
-    };
-
-    Element source2 = new FakeSource("source2") {
-      {
-        this.delay = 100;
-        this.numBuffers = 10;
-        this.incommingBufLength = 10;
-      }
-    };
-
-    Pipeline pipeline = new PipelineImpl("test");
-    pipeline.add(source1);
-    pipeline.add(source2);
-    pipeline.add(new Queue("queue"));
-    pipeline.add(new FakeSink("sink"));
-
-    // Main flow
-    pipeline.link("source1", "in1< queue");
-    pipeline.link("source2", "in2< queue");
-    pipeline.link("queue", "sink");
-
-    new Thread(pipeline.getLink("source1", STDOUT)).start();
-    new Thread(pipeline.getLink("source2", STDOUT)).start();
-    pipeline.getLink("sink", STDIN).run();
-  }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
old mode 100644
new mode 100755
index c23edd8..22d436e
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
@@ -16,224 +16,20 @@
 // under the License.
 package streamer;
 
-import static rdpclient.MockServer.Packet.PacketType.CLIENT;
-import static rdpclient.MockServer.Packet.PacketType.SERVER;
-
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-
-import rdpclient.MockServer;
-import rdpclient.MockServer.Packet;
-import rdpclient.TrustAllX509TrustManager;
-
-public class SocketWrapper extends PipelineImpl {
-
-  protected InputStreamSource source;
-  protected OutputStreamSink sink;
-  protected Socket socket;
-  protected InetSocketAddress address;
-
-  protected SSLSocket sslSocket;
-
-  //protected String SSL_VERSION_TO_USE = "TLSv1.2";
-  /*DEBUG*/protected String SSL_VERSION_TO_USE = "TLSv1";
-
-  public SocketWrapper(String id) {
-    super(id);
-  }
-
-  @Override
-  protected HashMap<String, Element> initElementMap(String id) {
-    HashMap<String, Element> map = new HashMap<String, Element>();
-
-    source = new InputStreamSource(id + "." + OUT, this);
-    sink = new OutputStreamSink(id + "." + IN, this);
-
-    // Pass requests to read data to socket input stream
-    map.put(OUT, source);
-
-    // All incoming data, which is sent to this socket wrapper, will be sent
-    // to socket remote
-    map.put(IN, sink);
-
-    return map;
-  }
-
-  /**
-   * Connect this socket wrapper to remote server and start main loop on
-   * IputStreamSource stdout link, to watch for incoming data, and
-   * OutputStreamSink stdin link, to pull for outgoing data.
-   * 
-   * @param address
-   * @throws IOException
-   */
-  public void connect(InetSocketAddress address) throws IOException {
-    this.address = address;
-
-    // Connect socket to server
-    socket = SocketFactory.getDefault().createSocket();
-    try {
-      socket.connect(address);
-
-      InputStream is = socket.getInputStream();
-      source.setInputStream(is);
-
-      OutputStream os = socket.getOutputStream();
-      sink.setOutputStream(os);
-
-      // Start polling for data to send to remote sever
-      runMainLoop(IN, STDIN, true, true);
-
-      // Push incoming data from server to handlers
-      runMainLoop(OUT, STDOUT, false, false);
-
-    } finally {
-      socket.close();
-    }
-  }
-
-  @Override
-  public void handleEvent(Event event, Direction direction) {
-    switch (event) {
-    case SOCKET_UPGRADE_TO_SSL:
-      upgradeToSsl();
-      break;
-    default:
-      super.handleEvent(event, direction);
-      break;
-    }
-  }
-
-  public void upgradeToSsl() {
-
-    if (sslSocket != null)
-      // Already upgraded
-      return;
-
-    if (verbose)
-      System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
-
-    try {
-      // Use most secure implementation of SSL available now.
-      // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
-      // TLS1.2 is not supported.
-      SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
-
-      // Trust all certificates (FIXME: insecure)
-      sslContext.init(null, new TrustManager[] { new TrustAllX509TrustManager() }, null);
-
-      SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
-      sslSocket = (SSLSocket) sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
-      sslSocket.startHandshake();
-
-      InputStream sis = sslSocket.getInputStream();
-      source.setInputStream(sis);
-
-      OutputStream sos = sslSocket.getOutputStream();
-      sink.setOutputStream(sos);
-
-    } catch (Exception e) {
-      throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
-    }
-
-  }
-
-  @Override
-  public void validate() {
-    for (Element element : elements.values())
-      element.validate();
-
-    if (get(IN).getPads(Direction.IN).size() == 0)
-      throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
-
-    if (get(OUT).getPads(Direction.OUT).size() == 0)
-      throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
-
-  }
-
-  public void shutdown() {
-    try {
-      handleEvent(Event.STREAM_CLOSE, Direction.IN);
-    } catch (Exception e) {
-    }
-    try {
-      handleEvent(Event.STREAM_CLOSE, Direction.OUT);
-    } catch (Exception e) {
-    }
-    try {
-      if (sslSocket != null)
-        sslSocket.close();
-    } catch (Exception e) {
-    }
-    try {
-      socket.close();
-    } catch (Exception e) {
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "SocketWrapper(" + id + ")";
-  }
-
-  /**
-   * Example.
-   */
-  public static void main(String args[]) {
-    try {
-      System.setProperty("streamer.Link.debug", "true");
-      System.setProperty("streamer.Element.debug", "true");
-      System.setProperty("rdpclient.MockServer.debug", "true");
-
-      Pipeline pipeline = new PipelineImpl("echo client");
-
-      SocketWrapper socketWrapper = new SocketWrapper("socket");
-
-      pipeline.add(socketWrapper);
-      pipeline.add(new BaseElement("echo"));
 
-      pipeline.link("socket", "echo", "socket");
+public interface SocketWrapper extends Element {
 
-      final byte[] mockData = new byte[] { 0x01, 0x02, 0x03 };
-      MockServer server = new MockServer(new Packet[] { new Packet("Server hello") {
-        {
-          type = SERVER;
-          data = mockData;
-        }
-      }, new Packet("Client hello") {
-        {
-          type = CLIENT;
-          data = mockData;
-        }
-      }, new Packet("Server hello") {
-        {
-          type = SERVER;
-          data = mockData;
-        }
-      }, new Packet("Client hello") {
-        {
-          type = CLIENT;
-          data = mockData;
-        }
-      } });
-      server.start();
-      InetSocketAddress address = server.getAddress();
+    /**
+     * Connect this socket wrapper to remote server and start main loop on
+     * Source stdout link, to watch for incoming data, and
+     * Sink stdin link, to pull for outgoing data.
+     */
+    public void connect(InetSocketAddress address) throws IOException;
 
-      socketWrapper.connect(address);
+    public void shutdown();
 
-    } catch (IOException e) {
-      e.printStackTrace(System.err);
-    }
+    void upgradeToSsl();
 
-  }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
new file mode 100755
index 0000000..07b3dc9
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import streamer.ssl.TrustAllX509TrustManager;
+
+public class SocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+    protected InputStreamSource source;
+    protected OutputStreamSink sink;
+    protected Socket socket;
+    protected InetSocketAddress address;
+
+    protected SSLSocket sslSocket;
+
+    protected String SSL_VERSION_TO_USE = "TLSv1.2";
+    //protected String SSL_VERSION_TO_USE = "SSLv3";
+
+    protected SSLState sslState;
+
+    public SocketWrapperImpl(String id, SSLState sslState) {
+        super(id);
+        this.sslState = sslState;
+    }
+
+    @Override
+    protected HashMap<String, Element> initElementMap(String id) {
+        HashMap<String, Element> map = new HashMap<String, Element>();
+
+        source = new InputStreamSource(id + "." + OUT, this);
+        sink = new OutputStreamSink(id + "." + IN, this);
+
+        // Pass requests to read data to socket input stream
+        map.put(OUT, source);
+
+        // All incoming data, which is sent to this socket wrapper, will be sent
+        // to socket remote
+        map.put(IN, sink);
+
+        return map;
+    }
+
+    /**
+     * Connect this socket wrapper to remote server and start main loop on
+     * IputStreamSource stdout link, to watch for incoming data, and
+     * OutputStreamSink stdin link, to pull for outgoing data.
+     * 
+     * @param address
+     * @throws IOException
+     */
+    @Override
+    public void connect(InetSocketAddress address) throws IOException {
+        this.address = address;
+
+        // Connect socket to server
+        socket = SocketFactory.getDefault().createSocket();
+        try {
+            socket.connect(address);
+
+            InputStream is = socket.getInputStream();
+            source.setInputStream(is);
+
+            OutputStream os = socket.getOutputStream();
+            sink.setOutputStream(os);
+
+            // Start polling for data to send to remote sever
+            runMainLoop(IN, STDIN, true, true);
+
+            // Push incoming data from server to handlers
+            runMainLoop(OUT, STDOUT, false, false);
+
+        } finally {
+            socket.close();
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event, Direction direction) {
+        switch (event) {
+        case SOCKET_UPGRADE_TO_SSL:
+            upgradeToSsl();
+            break;
+        default:
+            super.handleEvent(event, direction);
+            break;
+        }
+    }
+
+    @Override
+    public void upgradeToSsl() {
+
+        if (sslSocket != null)
+            // Already upgraded
+            return;
+
+        if (verbose)
+            System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+        try {
+            // Use most secure implementation of SSL available now.
+            // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
+            // TLS1.2 is not supported.
+            SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
+
+            // Trust all certificates (FIXME: insecure)
+            sslContext.init(null, new TrustManager[] {new TrustAllX509TrustManager(sslState)}, null);
+
+            SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+            sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
+
+            sslSocket.startHandshake();
+
+            InputStream sis = sslSocket.getInputStream();
+            source.setInputStream(sis);
+
+            OutputStream sos = sslSocket.getOutputStream();
+            sink.setOutputStream(sos);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    public void validate() {
+        for (Element element : elements.values())
+            element.validate();
+
+        if (get(IN).getPads(Direction.IN).size() == 0)
+            throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
+
+        if (get(OUT).getPads(Direction.OUT).size() == 0)
+            throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
+
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.IN);
+        } catch (Exception e) {
+        }
+        try {
+            handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+        } catch (Exception e) {
+        }
+        try {
+            if (sslSocket != null)
+                sslSocket.close();
+        } catch (Exception e) {
+        }
+        try {
+            socket.close();
+        } catch (Exception e) {
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "SocketWrapper(" + id + ")";
+    }
+
+    /**
+     * Example.
+     */
+    public static void main(String args[]) {
+
+        try {
+            System.setProperty("streamer.Link.debug", "true");
+            System.setProperty("streamer.Element.debug", "true");
+            System.setProperty("rdpclient.MockServer.debug", "true");
+
+            Pipeline pipeline = new PipelineImpl("echo client");
+
+            SocketWrapperImpl socketWrapper = new SocketWrapperImpl("socket", null);
+
+            pipeline.add(socketWrapper);
+            pipeline.add(new BaseElement("echo"));
+            pipeline.add(new Queue("queue")); // To decouple input and output
+
+            pipeline.link("socket", "echo", "queue", "socket");
+
+            final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+            MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }, new Packet("Server hello") {
+                {
+                    type = SERVER;
+                    data = mockData;
+                }
+            }, new Packet("Client hello") {
+                {
+                    type = CLIENT;
+                    data = mockData;
+                }
+            }});
+            server.start();
+            InetSocketAddress address = server.getAddress();
+
+            /*DEBUG*/System.out.println("Address: " + address);
+            socketWrapper.connect(address);
+
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        }
+
+    }
+}