You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by de...@apache.org on 2013/12/23 10:12:58 UTC
[04/22] CLOUDSTACK-5344: Update to allow rdp console to access
hyper-v vm virtual framebuffer.
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
old mode 100644
new mode 100755
index 32c14bb..696402a
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
@@ -22,381 +22,393 @@ package streamer;
*/
public class SyncLink implements Link {
- /**
- * When null packet is pulled from source element, then make slight delay to
- * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
- * source element cannot produce data right now.
- */
- protected static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
-
- /**
- * Delay for null packets in poll method when blocking is requested, in
- * milliseconds.
- */
- protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
-
- /**
- * Set to true to print debugging messages.
- */
- protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
-
- /**
- * ID of this link.
- */
- protected String id = null;
-
- /**
- * Buffer with data to hold because link is paused, or data is pushed back.
- */
- protected ByteBuffer cacheBuffer = null;
-
- /**
- * Size of expected packet. Data must be hold in link until full packet will
- * be read.
- */
- protected int expectedPacketSize = 0;
-
- /**
- * Number of packets and packet header transferred to element.
- */
- protected int packetNumber = 0;
-
- /**
- * Set to true to hold all data in link until it will be set to false again.
- */
- protected boolean paused = false;
-
- /**
- * Element to pull data from, when in pull mode.
- */
- protected Element source = null;
-
- /**
- * Element to send data to in both pull and push modes.
- */
- protected Element sink = null;
-
- /**
- * When in loop, indicates that loop must be stopped.
- *
- * @see run()
- */
- private boolean shutdown = false;
-
- /**
- * Indicates that event STREAM_START is passed over this link, so main loop
- * can be started to pull data from source element.
- */
- protected boolean start;
-
- /**
- * Operate in pull mode.
- */
- protected boolean pullMode;
-
- public SyncLink() {
- }
-
- public SyncLink(String id) {
- this.id = id;
- }
-
- @Override
- public void pushBack(ByteBuffer buf) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
-
- if (cacheBuffer != null) {
- ByteBuffer tmp = cacheBuffer.join(buf);
- cacheBuffer.unref();
- cacheBuffer = tmp;
- } else {
- cacheBuffer = buf;
- cacheBuffer.ref();
+ /**
+ * When null packet is pulled from source element, then make slight delay to
+ * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
+ * source element cannot produce data right now.
+ */
+ public static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
+
+ /**
+ * Delay for null packets in poll method when blocking is requested, in
+ * milliseconds.
+ */
+ protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+ /**
+ * Set to true to print debugging messages.
+ */
+ protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
+
+ /**
+ * ID of this link.
+ */
+ protected String id = null;
+
+ /**
+ * Buffer with data to hold because link is paused, on hold, or data is pushed
+ * back from output element.
+ */
+ protected ByteBuffer cacheBuffer = null;
+
+ /**
+ * Size of expected packet. Data must be hold in link until full packet will
+ * be read.
+ */
+ protected int expectedPacketSize = 0;
+
+ /**
+ * Number of packets and packet header transferred to element.
+ */
+ protected int packetNumber = 0;
+
+ /**
+ * Element to pull data from, when in pull mode.
+ */
+ protected Element source = null;
+
+ /**
+ * Element to send data to in both pull and push modes.
+ */
+ protected Element sink = null;
+
+ /**
+ * When in loop, indicates that loop must be stopped.
+ *
+ * @see run()
+ */
+ private boolean shutdown = false;
+
+ /**
+ * Indicates that event STREAM_START is passed over this link, so main loop
+ * can be started to pull data from source element.
+ */
+ protected boolean started = false;
+
+ /**
+ * Set to true to hold all data in link until it will be set to false again.
+ */
+ protected boolean paused = false;
+
+ /**
+ * Used by pull() method to hold all data in this link to avoid recursion when
+ * source element is asked to push new data to it outputs.
+ */
+ protected boolean hold = false;
+
+ /**
+ * Operate in pull mode instead of default push mode. In pull mode, link will
+ * ask it source element for new data.
+ */
+ protected boolean pullMode = false;
+
+ public SyncLink() {
}
- resetCursor();
- }
-
- private void resetCursor() {
- // Reset cursor
- cacheBuffer.cursor = 0;
- }
-
- @Override
- public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
- pushBack(buf);
- expectedPacketSize = lengthOfFullPacket;
- }
-
- @Override
- public String toString() {
- return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
- }
-
- /**
- * Push data to sink. Call with null to push cached data.
- */
- @Override
- public void sendData(ByteBuffer buf) {
- if (!paused && pullMode)
- throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
-
- if (verbose)
- System.out.println("[" + this + "] INFO: Incoming buffer: " + buf + ".");
-
- if (buf == null && cacheBuffer == null)
- return;
-
- if (cacheBuffer != null && buf != null) {
- // Join old data with fresh data
- buf = cacheBuffer.join(buf);
- cacheBuffer.unref();
- cacheBuffer = buf;
+ public SyncLink(String id) {
+ this.id = id;
}
- // Store buffer in cache field to simplify following loop
- if (buf != null)
- cacheBuffer = buf;
-
- // When data pushed back and length of data is less than length of full
- // packet, then feed data to sink element immediately
- while (cacheBuffer != null) {
- if (paused) {
+ @Override
+ public void pushBack(ByteBuffer buf) {
if (verbose)
- System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
+ System.out.println("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
+
+ if (cacheBuffer != null) {
+ ByteBuffer tmp = cacheBuffer.join(buf);
+ cacheBuffer.unref();
+ cacheBuffer = tmp;
+ } else {
+ cacheBuffer = buf;
+ cacheBuffer.ref();
+ }
+
+ resetCursor();
+ }
+
+ private void resetCursor() {
+ // Reset cursor
+ cacheBuffer.cursor = 0;
+ }
- // Wait until rest of packet will be read
- return;
- }
+ @Override
+ public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
+ pushBack(buf);
+ expectedPacketSize = lengthOfFullPacket;
+ }
+
+ @Override
+ public String toString() {
+ return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
+ }
+
+ /**
+ * Push data to sink. Call with null to push cached data.
+ */
+ @Override
+ public void sendData(ByteBuffer buf) {
+ if (!hold && pullMode)
+ throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
- if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
if (verbose)
- System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
- + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+ System.out.println("[" + this + "] INFO: Incoming buffer: " + buf + ".");
- // Wait until rest of packet will be read
- return;
- }
+ if (buf == null && cacheBuffer == null)
+ return;
- // Full packet or packet header is read, feed it to element
- buf = cacheBuffer;
- cacheBuffer = null;
- expectedPacketSize = 0;
- packetNumber++;
+ if (cacheBuffer != null && buf != null) {
+ // Join old data with fresh data
+ buf = cacheBuffer.join(buf);
+ cacheBuffer.unref();
+ cacheBuffer = buf;
+ }
- if (sink == null)
- throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
+ // Store buffer in cache field to simplify following loop
+ if (buf != null)
+ cacheBuffer = buf;
+
+ // When data pushed back and length of data is less than length of full
+ // packet, then feed data to sink element immediately
+ while (cacheBuffer != null) {
+ if (paused || hold) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
+
+ // Wait until rest of packet will be read
+ return;
+ }
+
+ if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
+ + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+
+ // Wait until rest of packet will be read
+ return;
+ }
+
+ // Full packet or packet header is read, feed it to element
+ buf = cacheBuffer;
+ cacheBuffer = null;
+ expectedPacketSize = 0;
+ packetNumber++;
+
+ if (sink == null)
+ throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
+
+ sink.handleData(buf, this);
+ // cacheBuffer and expectedPacketSize can be changed at this time
+ }
- sink.handleData(buf, this);
- // cacheBuffer and expectedPacketSize can be changed at this time
}
- }
+ @SuppressWarnings("incomplete-switch")
+ @Override
+ public void sendEvent(Event event, Direction direction) {
- @SuppressWarnings("incomplete-switch")
- @Override
- public void sendEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+ // Shutdown main loop (if any) when STREAM_CLOSE event is received.
+ switch (event) {
+ case STREAM_START: {
+ if (!started)
+ started = true;
+ else
+ // Event already sent trough this link
+ return;
+ break;
+ }
+ case STREAM_CLOSE: {
+ if (!shutdown)
+ shutdown = true;
+ else
+ // Event already sent trough this link
+ return;
+ break;
+ }
+ case LINK_SWITCH_TO_PULL_MODE: {
+ setPullMode();
+ break;
+ }
- if (verbose)
- System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+ }
- // Shutdown main loop (if any) when STREAM_CLOSE event is received.
- switch (event) {
- case STREAM_START: {
- if (!start)
- start = true;
- else
- // Event already sent trough this link
- return;
- break;
- }
- case STREAM_CLOSE: {
- if (!shutdown)
- shutdown = true;
- else
- // Event already sent trough this link
- return;
- break;
- }
- case LINK_SWITCH_TO_PULL_MODE: {
- setPullMode();
- break;
+ switch (direction) {
+ case IN:
+ source.handleEvent(event, direction);
+ break;
+ case OUT:
+ sink.handleEvent(event, direction);
+ break;
+ }
}
- }
+ @Override
+ public ByteBuffer pull(boolean block) {
+ if (!pullMode)
+ throw new RuntimeException("[" + this + "] ERROR: This link is not in pull mode.");
+
+ if (hold)
+ throw new RuntimeException("[" + this + "] ERROR: This link is already on hold, waiting for data to be pulled in. Circular reference?");
+
+ if (paused) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Cannot pull, link is paused.");
+
+ // Make slight delay in such case, to avoid consuming 100% of CPU
+ if (block) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ return null;
+ }
+ // If data in cache can be sent immediately,
+ // then return it instead of asking for more data from source
+ if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
+
+ ByteBuffer tmp = cacheBuffer;
+ cacheBuffer = null;
+ return tmp;
+ }
- switch (direction) {
- case IN:
- source.handleEvent(event, direction);
- break;
- case OUT:
- sink.handleEvent(event, direction);
- break;
+ // Pause this link, so incoming data will not be sent to sink
+ // immediately, then ask source element for more data
+ try {
+ hold = true;
+ source.poll(block);
+ } finally {
+ hold = false;
+ }
+
+ // Can return something only when data was stored in buffer
+ if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
+
+ ByteBuffer tmp = cacheBuffer;
+ cacheBuffer = null;
+ return tmp;
+ } else {
+ return null;
+ }
}
- }
- @Override
- public ByteBuffer pull(boolean block) {
- if (!pullMode)
- throw new RuntimeException("This link is not in pull mode.");
+ @Override
+ public Element setSink(Element sink) {
+ if (sink != null && this.sink != null)
+ throw new RuntimeException("[" + this + "] ERROR: This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: "
+ + this.sink + ".");
- if (paused) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Cannot pull, link is paused.");
+ if (sink == null && cacheBuffer != null)
+ throw new RuntimeException("[" + this + "] ERROR: Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
- // Make slight delay in such case, to avoid consuming 100% of CPU
- if (block) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
+ this.sink = sink;
- return null;
+ return sink;
}
- // If data in cache can be sent immediately,
- // then return it instead of asking for more data from source
- if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
+ @Override
+ public Element setSource(Element source) {
+ if (this.source != null && source != null)
+ throw new RuntimeException("[" + this + "] ERROR: This link source element is already set. Link: " + this + ", new source: " + source
+ + ", existing source: " + this.source + ".");
- ByteBuffer tmp = cacheBuffer;
- cacheBuffer = null;
- return tmp;
+ this.source = source;
+ return source;
}
- // Pause this link, so incoming data will not be sent to sink
- // immediately, then ask source element for more data
- pause();
- source.poll(block);
- resume();
-
- // Can return something only when data was stored in buffer
- if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
-
- ByteBuffer tmp = cacheBuffer;
- cacheBuffer = null;
- return tmp;
- } else {
- return null;
+ @Override
+ public Element getSource() {
+ return source;
}
- }
-
- @Override
- public Element setSink(Element sink) {
- if (sink != null && this.sink != null)
- throw new RuntimeException("This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: " + this.sink + ".");
-
- if (sink == null && cacheBuffer != null)
- throw new RuntimeException("Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
-
- this.sink = sink;
-
- return sink;
- }
-
- @Override
- public Element setSource(Element source) {
- if (this.source != null && source != null)
- throw new RuntimeException("This link source element is already set. Link: " + this + ", new source: " + source + ", existing source: " + this.source
- + ".");
-
- this.source = source;
- return source;
- }
-
- @Override
- public Element getSource() {
- return source;
- }
-
- @Override
- public Element getSink() {
- return sink;
- }
-
- @Override
- public void pause() {
- if (paused)
- throw new RuntimeException("Link is already paused.");
-
- paused = true;
-
- }
-
- @Override
- public void resume() {
- paused = false;
- }
-
- /**
- * Run pull loop to actively pull data from source and push it to sink. It
- * must be only one pull loop per thread.
- *
- * Pull loop will start after event STREAM_START. This link and source element
- * incomming links will be switched to pull mode before pull loop will be
- * started using event LINK_SWITCH_TO_PULL_MODE.
- */
- @Override
- public void run() {
- // Wait until even STREAM_START will arrive
- while (!start) {
- delay();
+
+ @Override
+ public Element getSink() {
+ return sink;
}
- sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
+ @Override
+ public void pause() {
+ if (paused)
+ throw new RuntimeException("[" + this + "] ERROR: Link is already paused.");
- if (verbose)
- System.out.println("[" + this + "] INFO: Starting pull loop.");
+ paused = true;
- // Pull source in loop
- while (!shutdown) {
- // Pull data from source element and send it to sink element
- ByteBuffer data = pull(true);
- if (data != null)
- sink.handleData(data, this);
+ }
- if (!shutdown && data == null) {
- // Make slight delay to avoid consuming of 100% of CPU
- delay();
- }
+ @Override
+ public void resume() {
+ paused = false;
}
- if (verbose)
- System.out.println("[" + this + "] INFO: Pull loop finished.");
+ /**
+ * Run pull loop to actively pull data from source and push it to sink. It
+ * must be only one pull loop per thread.
+ *
+ * Pull loop will start after event STREAM_START. This link and source element
+ * incomming links will be switched to pull mode before pull loop will be
+ * started using event LINK_SWITCH_TO_PULL_MODE.
+ */
+ @Override
+ public void run() {
+ // Wait until even STREAM_START will arrive
+ while (!started) {
+ delay();
+ }
- }
+ sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Starting pull loop.");
+
+ // Pull source in loop
+ while (!shutdown) {
+ // Pull data from source element and send it to sink element
+ ByteBuffer data = pull(true);
+ if (data != null)
+ sink.handleData(data, this);
+
+ if (!shutdown && data == null) {
+ // Make slight delay to avoid consuming of 100% of CPU
+ delay();
+ }
+ }
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Pull loop finished.");
- protected void delay() {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- e.printStackTrace(System.err);
- throw new RuntimeException("Interrupted in main loop.", e);
}
- }
- @Override
- public void setPullMode() {
- if (verbose)
- System.out.println("[" + this + "] INFO: Switching to PULL mode.");
+ protected void delay() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("[" + this + "] ERROR: Interrupted in main loop.", e);
+ }
+ }
- this.pullMode = true;
- }
+ @Override
+ public void setPullMode() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching to PULL mode.");
- @Override
- public void drop() {
- if (pullMode)
- throw new RuntimeException("Cannot drop link in pull mode.");
+ pullMode = true;
+ }
- if (cacheBuffer != null)
- throw new RuntimeException("Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+ @Override
+ public void drop() {
+ if (pullMode)
+ throw new RuntimeException("[" + this + "] ERROR: Cannot drop link in pull mode.");
- source.dropLink(this);
- sink.dropLink(this);
- }
+ if (cacheBuffer != null)
+ throw new RuntimeException("[" + this + "] ERROR: Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+
+ source.dropLink(this);
+ sink.dropLink(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
new file mode 100755
index 0000000..edfe8db
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSource;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+public class AprSocketSink extends BaseElement {
+
+ protected AprSocketWrapperImpl socketWrapper;
+ protected Long socket;
+
+ public AprSocketSink(String id) {
+ super(id);
+ }
+
+ public AprSocketSink(String id, AprSocketWrapperImpl socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ public void setSocket(long socket) {
+ this.socket = socket;
+
+ // Resume links
+ resumeLinks();
+ }
+
+ /**
+ * Send incoming data to stream.
+ */
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ if (socketWrapper.shutdown)
+ return;
+
+ try {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+ // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+ Socket.send(socket, buf.data, buf.offset, buf.length);
+ } catch (Exception e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ case LINK_SWITCH_TO_PULL_MODE:
+ throw new RuntimeException("[" + this + "] ERROR: Unexpected event: sink recived LINK_SWITCH_TO_PULL_MODE event.");
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case IN:
+ super.setLink(padName, link, direction);
+
+ if (socket == null)
+ // Pause links until data stream will be ready
+ link.pause();
+ break;
+ case OUT:
+ throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ private void resumeLinks() {
+ for (DataSource source : inputPads.values())
+ ((Link)source).resume();
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+ if (socketWrapper.shutdown)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ socketWrapper.shutdown();
+ }
+
+ @Override
+ public String toString() {
+ return "AprSocketSink(" + id + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
new file mode 100755
index 0000000..178034e
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSink;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class AprSocketSource extends BaseElement {
+
+ protected AprSocketWrapperImpl socketWrapper;
+ protected Long socket;
+
+ public AprSocketSource(String id) {
+ super(id);
+ }
+
+ public AprSocketSource(String id, AprSocketWrapperImpl socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ case LINK_SWITCH_TO_PULL_MODE:
+ // Do nothing - this is the source
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case OUT:
+ super.setLink(padName, link, direction);
+
+ if (socket == null) {
+ // Pause links until data stream will be ready
+ link.pause();
+ }
+ break;
+ case IN:
+ throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ public void setSocket(long socket) {
+ this.socket = socket;
+
+ // Resume links
+ resumeLinks();
+ }
+
+ private void resumeLinks() {
+ for (DataSink sink : outputPads.values())
+ ((Link)sink).resume();
+ }
+
+ /**
+ * Read data from input stream.
+ */
+ @Override
+ public void poll(boolean block) {
+ if (socketWrapper.shutdown) {
+ socketWrapper.destroyPull();
+ return;
+ }
+
+ try {
+ // Create buffer of recommended size and with default offset
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+ // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+ int actualLength = (block) ? // Blocking read
+ Socket.recv(socket, buf.data, buf.offset, buf.data.length - buf.offset)
+ : // Non-blocking read
+ Socket.recvt(socket, buf.data, buf.offset, buf.data.length - buf.offset, 1);
+
+ if (socketWrapper.shutdown) {
+ socketWrapper.destroyPull();
+ return;
+ }
+
+ if (actualLength < 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: End of stream.");
+
+ buf.unref();
+ closeStream();
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ if (actualLength == 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+ buf.unref();
+ return;
+ }
+
+ buf.length = actualLength;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+ pushDataToAllOuts(buf);
+
+ } catch (Exception e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+
+ if (socketWrapper.shutdown)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ socketWrapper.shutdown();
+ }
+
+ @Override
+ public String toString() {
+ return "AprSocketSource(" + id + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
new file mode 100755
index 0000000..2ee426b
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.tomcat.jni.Address;
+import org.apache.tomcat.jni.Error;
+import org.apache.tomcat.jni.Library;
+import org.apache.tomcat.jni.Pool;
+import org.apache.tomcat.jni.SSL;
+import org.apache.tomcat.jni.SSLContext;
+import org.apache.tomcat.jni.SSLSocket;
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+import streamer.Queue;
+import streamer.SocketWrapper;
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import sun.security.x509.X509CertImpl;
+
+public class AprSocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+ static {
+ try {
+ Library.initialize(null);
+ SSL.initialize(null);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot load Tomcat Native Library (Apache Portable Runtime).", e);
+ }
+ }
+
+ private final SSLState sslState;
+
+ final long pool = Pool.create(0);
+ long inetAddress;
+ long socket;
+ private AprSocketSource source;
+ private AprSocketSink sink;
+
+ boolean shutdown = false;
+
+ private final boolean shutdowned = false;
+
+ public AprSocketWrapperImpl(String id, SSLState sslState) {
+ super(id);
+ this.sslState = sslState;
+ }
+
+ @Override
+ protected HashMap<String, Element> initElementMap(String id) {
+ HashMap<String, Element> map = new HashMap<String, Element>();
+
+ source = new AprSocketSource(id + "." + OUT, this);
+ sink = new AprSocketSink(id + "." + IN, this);
+
+ // Pass requests to read data to socket input stream
+ map.put(OUT, source);
+
+ // All incoming data, which is sent to this socket wrapper, will be sent
+ // to socket remote
+ map.put(IN, sink);
+
+ return map;
+ }
+
+ /**
+ * Connect this socket wrapper to remote server and start main loop on
+ * AprSocketSource stdout link, to watch for incoming data, and
+ * AprSocketSink stdin link, to pull for outgoing data.
+ */
+ @Override
+ public void connect(InetSocketAddress address) throws IOException {
+ try {
+ inetAddress = Address.info(address.getHostName(), Socket.APR_UNSPEC, address.getPort(), 0, pool);
+ socket = Socket.create(Address.getInfo(inetAddress).family, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
+ } catch (Exception e) {
+ throw new IOException("[" + this + "] ERROR: Cannot create socket for \"" + address + "\".", e);
+ }
+
+ int ret = Socket.connect(socket, inetAddress);
+ if (ret != 0)
+ throw new IOException("[" + this + "] ERROR: Cannot connect to remote host \"" + address + "\": " + Error.strerror(ret));
+
+ source.setSocket(socket);
+ sink.setSocket(socket);
+
+ // Start polling for data to send to remote sever
+ runMainLoop(IN, STDIN, true, true);
+
+ // Push incoming data from server to handlers
+ runMainLoop(OUT, STDOUT, false, false);
+
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public void validate() {
+
+ if (getPads(Direction.IN).size() == 0)
+ throw new RuntimeException("[ " + this + "] BUG: Input of socket is not connected.");
+
+ if (getPads(Direction.OUT).size() == 0)
+ throw new RuntimeException("[ " + this + "] BUG: Output of socket is not connected.");
+
+ }
+
+ @Override
+ public void upgradeToSsl() {
+
+ try {
+ long sslContext;
+ try {
+ sslContext = SSLContext.make(pool, SSL.SSL_PROTOCOL_TLSV1, SSL.SSL_MODE_CLIENT);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot create SSL context using Tomcat native library.", e);
+ }
+
+ SSLContext.setOptions(sslContext, SSL.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS | SSL.SSL_OP_TLS_BLOCK_PADDING_BUG | SSL.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
+ | SSL.SSL_OP_MSIE_SSLV2_RSA_PADDING);
+ // FIXME: verify certificate by default
+ SSLContext.setVerify(sslContext, SSL.SSL_CVERIFY_NONE, 0);
+ int ret;
+ try {
+ ret = SSLSocket.attach(sslContext, socket);
+ } catch (Exception e) {
+ throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket: ", e);
+ }
+ if (ret != 0)
+ throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket(" + ret + "): " + SSL.getLastError());
+
+ try {
+ ret = SSLSocket.handshake(socket);
+ } catch (Exception e) {
+ throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server: ", e);
+ }
+ if (ret != 0 && ret != 20014) // 20014: bad certificate signature FIXME: show prompt for self signed certificate
+ throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server(" + ret + "): " + SSL.getLastError());
+
+ try {
+ byte[] key = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+ //*DEBUG*/System.out.println("DEBUG: Server cert:\n"+new ByteBuffer(key).dump());
+ sslState.serverCertificateSubjectPublicKeyInfo = new X509CertImpl(key).getPublicKey().getEncoded();
+ } catch (Exception e) {
+ throw new RuntimeException("[" + this + "] ERROR: Cannot get server public key: ", e);
+ }
+
+ } catch (RuntimeException e) {
+ shutdown();
+ throw e;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (shutdown)
+ return;
+
+ shutdown = true;
+
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ }
+
+ void destroyPull() {
+ if (shutdowned)
+ return;
+
+ // Causes segfault in AprSocketSource.poll() method, so this function must be called from it
+ try {
+ Socket.close(socket);
+ // or
+ // Socket.shutdown(socket, Socket.APR_SHUTDOWN_READWRITE);
+ Pool.destroy(pool);
+ } catch (Exception e) {
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "AprSocketWrapper(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ try {
+ System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ System.setProperty("rdpclient.MockServer.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("echo client");
+
+ AprSocketWrapperImpl socketWrapper = new AprSocketWrapperImpl("socket", null);
+
+ pipeline.add(socketWrapper);
+ pipeline.add(new BaseElement("echo"));
+ pipeline.add(new Queue("queue")); // To decouple input and output
+
+ pipeline.link("socket", "echo", "queue", "socket");
+
+ final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+ MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }});
+ server.start();
+ InetSocketAddress address = server.getAddress();
+
+ socketWrapper.connect(address);
+
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
new file mode 100755
index 0000000..67e2dbd
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.bco;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.SecureRandom;
+import java.security.Security;
+
+import org.bouncycastle.asn1.x509.X509CertificateStructure;
+import org.bouncycastle.crypto.tls.CertificateVerifyer;
+import org.bouncycastle.crypto.tls.TlsProtocolHandler;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+import streamer.Direction;
+import streamer.Event;
+import streamer.SocketWrapperImpl;
+import streamer.ssl.SSLState;
+
+@SuppressWarnings("deprecation")
+public class BcoSocketWrapperImpl extends SocketWrapperImpl {
+
+ static {
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ private TlsProtocolHandler bcoSslSocket;
+
+ public BcoSocketWrapperImpl(String id, SSLState sslState) {
+ super(id, sslState);
+ }
+
+ @Override
+ public void upgradeToSsl() {
+
+ if (sslSocket != null)
+ // Already upgraded
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+ try {
+
+ SecureRandom secureRandom = new SecureRandom();
+ bcoSslSocket = new TlsProtocolHandler(socket.getInputStream(), socket.getOutputStream(), secureRandom);
+
+ CertificateVerifyer client = new CertificateVerifyer() {
+
+ @Override
+ public boolean isValid(X509CertificateStructure[] chain) {
+
+ try {
+ if (sslState != null) {
+ sslState.serverCertificateSubjectPublicKeyInfo = chain[0].getSubjectPublicKeyInfo().getEncoded();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot get server public key.", e);
+ }
+
+ return true;
+ }
+ };
+ bcoSslSocket.connect(client);
+
+ InputStream sis = bcoSslSocket.getInputStream();
+ source.setInputStream(sis);
+
+ OutputStream sos = bcoSslSocket.getOutputStream();
+ sink.setOutputStream(sos);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ try {
+ if (bcoSslSocket != null)
+ bcoSslSocket.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BcoSocketWrapper(" + id + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java
new file mode 100755
index 0000000..15449c3
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/AssertingByteBuffer.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import java.nio.charset.Charset;
+
+import streamer.ByteBuffer;
+
+/**
+ * Assert that writes to this buffer are matching expected data.
+ */
+public class AssertingByteBuffer extends ByteBuffer {
+
+ public AssertingByteBuffer(byte[] expectedData) {
+ super(expectedData);
+ }
+
+ private void assertEquals(int expected, int actual) {
+ if (expected != actual)
+ throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ", buf: " + this + ".");
+ }
+
+ @Override
+ public void writeByte(int b) {
+ if (b < 0)
+ throw new RuntimeException();
+ //*DEBUG*/System.out.println("WriteByte: "+b+", cursor:"+cursor+".");
+ assertEquals(readUnsignedByte(), b & 0xff);
+ }
+
+ @Override
+ public void writeShort(int x) {
+ //*DEBUG*/System.out.println("WriteShort: "+x+", cursor:"+cursor+".");
+ assertEquals(readUnsignedShort(), x & 0xFFff);
+ }
+
+ @Override
+ public void writeShortLE(int x) {
+ //*DEBUG*/System.out.println("WriteShortLE: "+x+", cursor:"+cursor+".");
+ assertEquals(readUnsignedShortLE(), x & 0xFFff);
+ }
+
+ @Override
+ public void writeInt(int i) {
+ //*DEBUG*/System.out.println("WriteInt: "+i+", cursor:"+cursor+".");
+ assertEquals(readSignedInt(), i);
+ }
+
+ @Override
+ public void writeIntLE(int i) {
+ //*DEBUG*/System.out.println("WriteIntLE: "+i+", cursor:"+cursor+".");
+ assertEquals(readSignedIntLE(), i);
+ }
+
+ @Override
+ public void writeVariableIntLE(int i) {
+ //*DEBUG*/System.out.println("WriteVariableIntLE: "+i+", cursor:"+cursor+".");
+ assertEquals(readVariableSignedIntLE(), i);
+ }
+
+ @Override
+ public void writeString(String actual, Charset charset) {
+ //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+ String expected = readString(actual.length(), charset);
+ if (!actual.equals(expected))
+ throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+ }
+
+ @Override
+ public void writeBytes(ByteBuffer actual) {
+ //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+ ByteBuffer expected = readBytes(actual.length);
+ if (!actual.equals(expected))
+ throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+ }
+
+ @Override
+ public void writeBytes(byte[] actualData) {
+ ByteBuffer actual = new ByteBuffer(actualData);
+ //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+ ByteBuffer expected = readBytes(actual.length);
+ if (!actual.equals(expected))
+ throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+ }
+
+ @Override
+ public void writeBytes(byte[] actualData, int offset, int length) {
+ ByteBuffer actual = new ByteBuffer(actualData, offset, length);
+ //*DEBUG*/System.out.println("WriteString: "+actual+", cursor:"+cursor+".");
+ ByteBuffer expected = readBytes(actual.length);
+ if (!actual.equals(expected))
+ throw new RuntimeException("Expected value does not match actual value. Expected value: " + expected + ", actual value: " + actual + ".");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java
new file mode 100755
index 0000000..c884cfb
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/Dumper.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.ByteBuffer;
+
+public interface Dumper {
+
+ /**
+ * Parse and dump content of buffer.
+ */
+ void dump(ByteBuffer buf);
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java
new file mode 100755
index 0000000..9315488
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSink.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Link;
+import streamer.SyncLink;
+
+public class FakeSink extends BaseElement {
+
+ public FakeSink(String id) {
+ super(id);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+ if (buf == null)
+ return;
+
+ // Use packetNumber variable to count incoming packets
+ packetNumber++;
+
+ buf.unref();
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSink(" + id + ")";
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element sink = new FakeSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ byte[] data = new byte[] {1, 2, 3};
+ ByteBuffer buf = new ByteBuffer(data);
+ sink.setLink(STDIN, new SyncLink(), Direction.IN);
+ sink.getLink(STDIN).sendData(buf);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java
new file mode 100755
index 0000000..adfa594
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/FakeSource.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Link;
+import streamer.SyncLink;
+
+public class FakeSource extends BaseElement {
+
+ /**
+ * Delay for null packets in poll method when blocking is requested, in
+ * milliseconds.
+ */
+ protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+ public FakeSource(String id) {
+ super(id);
+ }
+
+ @Override
+ public void poll(boolean block) {
+ if (numBuffers > 0 && packetNumber >= numBuffers) {
+ // Close stream when limit of packets is reached
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ // Prepare new packet
+ ByteBuffer buf = initializeData();
+
+ // Push it to output(s)
+ if (buf != null)
+ pushDataToAllOuts(buf);
+
+ // Make slight delay when blocking input was requested (to avoid
+ // consuming of 100% in parent loop)
+ if (block)
+ delay();
+
+ }
+
+ /**
+ * Make slight delay. Should be used when blocking input is requested in pull
+ * mode, but null packed was returned by input.
+ */
+ protected void delay() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * Initialize data.
+ */
+ public ByteBuffer initializeData() {
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ // Set first byte of package to it sequance number
+ buf.data[buf.offset] = (byte)(packetNumber % 128);
+
+ // Initialize rest of bytes with sequential values, which are
+ // corresponding with their position in byte buffer
+ for (int i = buf.offset + 1; i < buf.length; i++)
+ buf.data[i] = (byte)(i % 128);
+
+ buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+ buf.putMetadata("src", id);
+
+ return buf;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSource(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element fakeSource = new FakeSource("source 3/10/100") {
+ {
+ verbose = true;
+ this.incommingBufLength = 3;
+ this.numBuffers = 10;
+ this.delay = 100;
+ }
+ };
+
+ Element fakeSink = new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Element fakeSink2 = new FakeSink("sink2") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ fakeSource.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ Link link2 = new SyncLink();
+
+ fakeSource.setLink("out2", link2, Direction.OUT);
+ fakeSink2.setLink(STDIN, link2, Direction.IN);
+
+ link.sendEvent(Event.STREAM_START, Direction.IN);
+ link.run();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java
new file mode 100755
index 0000000..3f313b8
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockServer.java
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+public class MockServer implements Runnable {
+
+ private boolean shutdown = false;
+ private ServerSocket serverSocket;
+ private Packet[] packets;
+ private Throwable exception;
+ private boolean shutdowned;
+
+ /**
+ * Set to true to enable debugging messages.
+ */
+ protected boolean verbose = System.getProperty("rdpclient.MockServer.debug", "false").equals("true");
+
+ public MockServer(Packet packets[]) {
+ this.packets = packets;
+ }
+
+ public void start() throws IOException {
+ serverSocket = new ServerSocket(0);
+
+ shutdown = false;
+ exception = null;
+ shutdowned = false;
+
+ Thread thread = new Thread(this);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void run() {
+
+ try {
+ Socket socket = serverSocket.accept();
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Clien connected: " + socket.getRemoteSocketAddress() + ".");
+
+ InputStream is = socket.getInputStream();
+ OutputStream os = socket.getOutputStream();
+
+ try {
+ for (int i = 0; i < packets.length && !shutdown; i++) {
+
+ Packet packet = packets[i];
+ switch (packet.type) {
+ case CLIENT: {
+ // Read client data and compare it with mock data
+ // (unless "ignore" option is set)
+ byte actualData[] = new byte[packet.data.length];
+ int actualDataLength = is.read(actualData);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data is read: {" + Arrays.toString(Arrays.copyOf(actualData, actualDataLength)) + "}.");
+
+ if (!packet.ignore) {
+ // Compare actual data with expected data
+ if (actualDataLength != packet.data.length) {
+ throw new AssertionError("Actual length of client request for packet #" + (i + 1) + " (\"" + packet.id + "\")"
+ + " does not match length of expected client request. Actual length: " + actualDataLength + ", expected legnth: " + packet.data.length
+ + ".");
+ }
+
+ for (int j = 0; j < packet.data.length; j++) {
+
+ if (packet.data[j] != actualData[j]) {
+ throw new AssertionError("Actual byte #" + (j + 1) + " of client request for packet #" + (i + 1) + " (\"" + packet.id + "\")"
+ + " does not match corresponding byte of expected client request. Actual byte: " + actualData[j] + ", expected byte: " + packet.data[j]
+ + ".");
+ }
+ }
+ }
+ break;
+ }
+ case SERVER: {
+ // Send mock data to client
+ os.write(packet.data);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data is written: {" + Arrays.toString(packet.data) + "}.");
+
+ break;
+ }
+ case UPGRADE_TO_SSL: {
+ // Attach SSL context to socket
+
+ final SSLSocketFactory sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
+ SSLSocket sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, null, serverSocket.getLocalPort(), true);
+ sslSocket.setEnabledCipherSuites(sslSocket.getSupportedCipherSuites());
+ sslSocket.setUseClientMode(false);
+ sslSocket.startHandshake();
+ is = sslSocket.getInputStream();
+ os = sslSocket.getOutputStream();
+
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown packet type: " + packet.type);
+ }
+
+ }
+ } finally {
+ try {
+ is.close();
+ } catch (Throwable e) {
+ }
+ try {
+ os.close();
+ } catch (Throwable e) {
+ }
+ try {
+ socket.close();
+ } catch (Throwable e) {
+ }
+ try {
+ serverSocket.close();
+ } catch (Throwable e) {
+ }
+ }
+ } catch (Throwable e) {
+ System.err.println("Error in mock server: " + e.getMessage());
+ e.printStackTrace(System.err);
+ exception = e;
+ }
+ shutdowned = true;
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Mock server shutdowned.");
+
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ }
+
+ public InetSocketAddress getAddress() {
+ return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ public static class Packet {
+ public static enum PacketType {
+ SERVER, CLIENT, UPGRADE_TO_SSL;
+ }
+
+ public String id = "";
+
+ public Packet() {
+ }
+
+ public Packet(String id) {
+ this.id = id;
+ }
+
+ public PacketType type;
+
+ public boolean ignore = false;
+
+ public byte data[];
+ }
+
+ public boolean isShutdowned() {
+ return shutdowned;
+ }
+
+ public void waitUntilShutdowned(long timeToWaitMiliseconds) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + timeToWaitMiliseconds;
+ while (!shutdowned && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java
new file mode 100755
index 0000000..3c1ce7c
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSink.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Link;
+import streamer.SyncLink;
+
+/**
+ * Compare incoming packets with expected packets.
+ */
+public class MockSink extends BaseElement {
+
+ protected ByteBuffer bufs[];
+ protected Dumper dumper;
+
+ public MockSink(String id) {
+ super(id);
+ }
+
+ public MockSink(String id, ByteBuffer bufs[]) {
+ super(id);
+ this.bufs = bufs;
+ }
+
+ public MockSink(String id, ByteBuffer bufs[], Dumper dumper) {
+ super(id);
+ this.bufs = bufs;
+ this.dumper = dumper;
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+ if (buf == null)
+ return;
+
+ if (packetNumber >= bufs.length)
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
+ + ", unexpected buffer: " + buf + ".");
+
+ // Compare incoming buffer with expected buffer
+ ByteBuffer expectedBuf = bufs[packetNumber];
+ if (!Arrays.equals(expectedBuf.toByteArray(), buf.toByteArray())) {
+ dump(buf, expectedBuf);
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n Actual bufer: " + buf
+ + ",\n expected buffer: " + expectedBuf + ".");
+ }
+
+ // If expected buffer has metadata, then compare it too
+ Set<String> metadataKeys = expectedBuf.getMetadataKeys();
+ if (metadataKeys.size() > 0) {
+ for (String key : metadataKeys) {
+ Object expectedValue = expectedBuf.getMetadata(key);
+ Object actualValue = buf.getMetadata(key);
+ if (actualValue == null)
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer in metadata for key \"" + key
+ + "\".\n Actual metadata value: " + actualValue + ",\n expected value: \"" + expectedValue + "\".");
+
+ if (!expectedValue.equals(actualValue))
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer in metadata for key \"" + key
+ + "\".\n Actual metadata value: \"" + actualValue + "\",\n expected value: \"" + expectedValue + "\".");
+ }
+ }
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: buffers are equal.");
+
+ // Use packetNumber variable to count incoming packets
+ packetNumber++;
+
+ buf.unref();
+ }
+
+ private void dump(ByteBuffer actualData, ByteBuffer expectedData) {
+ if (dumper != null) {
+ System.out.println("[" + this + "] INFO: Actual data:");
+ dumper.dump(actualData);
+ System.out.println("[" + this + "] INFO: Expected data:");
+ dumper.dump(expectedData);
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ super.onClose();
+
+ if (packetNumber != bufs.length)
+ throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
+ }
+
+ @Override
+ public String toString() {
+ return "MockSink(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element mockSource = new MockSource("source") {
+ {
+ this.bufs = new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}),
+ new ByteBuffer(new byte[] {3, 1, 2, 3}), new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
+ this.verbose = true;
+ this.delay = 100;
+ this.numBuffers = this.bufs.length;
+ }
+ };
+
+ Element mockSink = new MockSink("sink") {
+ {
+ this.bufs = new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}),
+ new ByteBuffer(new byte[] {3, 1, 2, 3}), new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink() {
+ {
+ this.verbose = true;
+ }
+ };
+
+ mockSource.setLink(STDOUT, link, Direction.OUT);
+ mockSink.setLink(STDIN, link, Direction.IN);
+
+ link.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java
new file mode 100755
index 0000000..2a70829
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/debug/MockSource.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.debug;
+
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Link;
+import streamer.SyncLink;
+
+public class MockSource extends FakeSource {
+
+ protected ByteBuffer bufs[] = null;
+
+ public MockSource(String id) {
+ super(id);
+ }
+
+ public MockSource(String id, ByteBuffer bufs[]) {
+ super(id);
+ this.bufs = bufs;
+ }
+
+ /**
+ * Initialize data.
+ */
+ @Override
+ public ByteBuffer initializeData() {
+ if (packetNumber >= bufs.length) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return null;
+ }
+
+ ByteBuffer buf = bufs[packetNumber];
+
+ buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+ return buf;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+ }
+
+ @Override
+ public String toString() {
+ return "MockSource(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element mockSource = new MockSource("source") {
+ {
+ this.bufs = new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}),
+ new ByteBuffer(new byte[] {3, 1, 2, 3}), new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
+ this.verbose = true;
+ this.delay = 100;
+ // this.numBuffers = this.bufs.length;
+ }
+ };
+
+ Element fakeSink = new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ mockSource.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ link.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java
new file mode 100755
index 0000000..f405088
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/SSLState.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.ssl;
+
+public class SSLState {
+
+ /**
+ * Server public certificate in ASN.1 BER format.
+ */
+ public byte[] serverCertificateSubjectPublicKeyInfo;
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java
new file mode 100755
index 0000000..4d9eac2
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/TrustAllX509TrustManager.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.ssl;
+
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.X509TrustManager;
+
+public class TrustAllX509TrustManager implements X509TrustManager {
+ private SSLState sslState;
+
+ public TrustAllX509TrustManager(SSLState sslState) {
+ this.sslState = sslState;
+ }
+
+ @Override
+ public void checkClientTrusted(final X509Certificate[] chain, final String authType) {
+ // TODO: ask user to confirm self-signed certificates
+ // Trust all (insecure)
+ }
+
+ @Override
+ public void checkServerTrusted(final X509Certificate[] chain, final String authType) {
+ // TODO: ask user to confirm self-signed certificates
+ // Trust all (insecure)
+
+ // Store public certificates to use for NTLMSSP negotiation
+ if (sslState != null)
+ sslState.serverCertificateSubjectPublicKeyInfo = chain[0].getPublicKey().getEncoded();
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ // TODO: use system CA certificates here
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java
new file mode 100755
index 0000000..9d7c708
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/ssl/UpgradeSocketToSSL.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.ssl;
+
+import streamer.ByteBuffer;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+import streamer.OneTimeSwitch;
+
+public class UpgradeSocketToSSL extends OneTimeSwitch {
+
+ public UpgradeSocketToSSL(String id) {
+ super(id);
+ }
+
+ @Override
+ protected void onStart() {
+
+ sendEventToAllPads(Event.SOCKET_UPGRADE_TO_SSL, Direction.IN);
+ switchOff();
+ }
+
+ @Override
+ protected void handleOneTimeData(ByteBuffer buf, Link link) {
+ throw new RuntimeException("Unexpected data: " + buf + ".");
+
+ }
+
+}