You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by de...@apache.org on 2013/12/23 10:12:59 UTC
[05/22] CLOUDSTACK-5344: Update to allow rdp console to access
hyper-v vm virtual framebuffer.
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
deleted file mode 100644
index ce9fdf9..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-import java.util.Arrays;
-
-/**
- * Compare incoming packets with expected packets.
- */
-public class MockSink extends BaseElement {
-
- protected ByteBuffer bufs[] = null;
-
- public MockSink(String id) {
- super(id);
- }
-
- public MockSink(String id, ByteBuffer bufs[]) {
- super(id);
- this.bufs = bufs;
- }
-
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
-
- if (buf == null)
- return;
-
- if (packetNumber >= bufs.length)
- throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
- + ", unexpected buffer: " + buf + ".");
-
- // Compare incoming buffer with expected buffer
- if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
- throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n Actual bufer: " + buf
- + ",\n expected buffer: " + bufs[packetNumber] + ".");
-
- if (verbose)
- System.out.println("[" + this + "] INFO: buffers are equal.");
-
- // Use packetNumber variable to count incoming packets
- packetNumber++;
-
- buf.unref();
- }
-
- @Override
- protected void onClose() {
- super.onClose();
-
- if (packetNumber != bufs.length)
- throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
- }
-
- @Override
- public String toString() {
- return "MockSink(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
-
- Element mockSource = new MockSource("source") {
- {
- this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
- new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
- this.verbose = true;
- this.delay = 100;
- this.numBuffers = this.bufs.length;
- }
- };
-
- Element mockSink = new MockSink("sink") {
- {
- this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
- new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
- this.verbose = true;
- }
- };
-
- Link link = new SyncLink() {
- {
- this.verbose = true;
- }
- };
-
- mockSource.setLink(STDOUT, link, Direction.OUT);
- mockSink.setLink(STDIN, link, Direction.IN);
-
- link.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
deleted file mode 100644
index db47db2..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class MockSource extends FakeSource {
-
- protected ByteBuffer bufs[] = null;
-
- public MockSource(String id) {
- super(id);
- }
-
- public MockSource(String id, ByteBuffer bufs[]) {
- super(id);
- this.bufs = bufs;
- }
-
- /**
- * Initialize data.
- */
- @Override
- public ByteBuffer initializeData() {
- if (packetNumber >= bufs.length) {
- sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
- return null;
- }
-
- ByteBuffer buf = bufs[packetNumber];
-
- buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
- return buf;
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Event received: " + event + ".");
-
- }
-
- @Override
- public String toString() {
- return "MockSource(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
-
- Element mockSource = new MockSource("source") {
- {
- this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
- new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
- this.verbose = true;
- this.delay = 100;
- // this.numBuffers = this.bufs.length;
- }
- };
-
- Element fakeSink = new FakeSink("sink") {
- {
- this.verbose = true;
- }
- };
-
- Link link = new SyncLink();
-
- mockSource.setLink(STDOUT, link, Direction.OUT);
- fakeSink.setLink(STDIN, link, Direction.IN);
-
- link.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
old mode 100644
new mode 100755
index a7d4848..2e5d891
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -29,105 +29,108 @@ package streamer;
*/
public abstract class OneTimeSwitch extends BaseElement {
- /**
- * One-time out - name of output pad for one time logic. By default, output
- * directly to socket.
- */
- public static final String OTOUT = "otout";
-
- private boolean switched = false;
-
- public OneTimeSwitch(String id) {
- super(id);
- declarePads();
- }
-
- protected void declarePads() {
- inputPads.put(STDIN, null);
- outputPads.put(OTOUT, null);
- outputPads.put(STDOUT, null);
- }
-
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- if (switched)
- throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
-
- if (buf == null)
- return;
-
- handleOneTimeData(buf, link);
- }
-
- public void pushDataToOTOut(ByteBuffer buf) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Sending data: " + buf + ".");
-
- outputPads.get(OTOUT).sendData(buf);
- }
-
- /**
- * Switch this element off. Pass data directly to main output(s).
- */
- public void switchOff() {
- if (verbose)
- System.out.println("[" + this + "] INFO: Switching OFF.");
-
- switched = true;
- verbose = false;
-
- // Rewire links: drop otout link, replace stdout link by stdin to send data
- // directly to stdout
- Link stdout = (Link) outputPads.get(STDOUT);
- Link stdin = (Link) inputPads.get(STDIN);
- Link otout = (Link) outputPads.get(OTOUT);
-
- otout.drop();
-
- // Wake up next peer(s)
- sendEventToAllPads(Event.STREAM_START, Direction.OUT);
-
- stdin.setSink(null);
- inputPads.remove(STDIN);
-
- Element nextPeer = stdout.getSink();
- nextPeer.replaceLink(stdout, stdin);
- stdout.drop();
-
- for (Object link : inputPads.values().toArray())
- ((Link) link).drop();
- for (Object link : outputPads.values().toArray())
- ((Link) link).drop();
-
- }
-
- public void switchOn() {
- if (verbose)
- System.out.println("[" + this + "] INFO: Switching ON.");
-
- switched = false;
- }
-
- /**
- * Override this method to handle one-time packet(s) at handshake or
- * initialization stages. Execute method @see switchRoute() when this method
- * is no longer necessary.
- */
- protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- if (event == Event.STREAM_START) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Event " + event + " is received.");
-
- switchOn();
-
- // Execute this element onStart(), but do not propagate event further,
- // to not wake up next elements too early
- onStart();
- } else
- super.handleEvent(event, direction);
- }
+ /**
+ * One-time out - name of output pad for one time logic. By default, output
+ * directly to socket.
+ */
+ public static final String OTOUT = "otout";
+
+ private boolean switched = false;
+
+ public OneTimeSwitch(String id) {
+ super(id);
+ declarePads();
+ }
+
+ protected void declarePads() {
+ inputPads.put(STDIN, null);
+ outputPads.put(OTOUT, null);
+ outputPads.put(STDOUT, null);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (switched)
+ throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
+
+ if (buf == null)
+ return;
+
+ handleOneTimeData(buf, link);
+ }
+
+ public void pushDataToOTOut(ByteBuffer buf) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Sending data: " + buf + ".");
+
+ outputPads.get(OTOUT).sendData(buf);
+ }
+
+ /**
+ * Switch this element off. Pass data directly to main output(s).
+ */
+ public void switchOff() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching OFF.");
+
+ switched = true;
+ verbose = false;
+
+ // Rewire links: drop otout link, replace stdout link by stdin to send data
+ // directly to stdout
+ Link stdout = (Link)outputPads.get(STDOUT);
+ Link stdin = (Link)inputPads.get(STDIN);
+ Link otout = (Link)outputPads.get(OTOUT);
+
+ otout.drop();
+
+ // Wake up next peer(s)
+ sendEventToAllPads(Event.STREAM_START, Direction.OUT);
+
+ // Disconnect our stdin from this element
+ stdin.setSink(null);
+ inputPads.remove(STDIN);
+
+ // Replace next peer stdin (our stdout) by our stdin
+ Element nextPeer = stdout.getSink();
+ nextPeer.replaceLink(stdout, stdin);
+ stdout.drop();
+
+ // Drop all other links
+ for (Object link : inputPads.values().toArray())
+ ((Link)link).drop();
+ for (Object link : outputPads.values().toArray())
+ ((Link)link).drop();
+
+ }
+
+ public void switchOn() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching ON.");
+
+ switched = false;
+ }
+
+ /**
+ * Override this method to handle one-time packet(s) at handshake or
+ * initialization stages. Execute method @see switchRoute() when this method
+ * is no longer necessary.
+ */
+ protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (event == Event.STREAM_START) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+ switchOn();
+
+ // Execute this element onStart(), but do not propagate event further,
+ // to not wake up next elements too early
+ onStart();
+ } else
+ super.handleEvent(event, direction);
+ }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
old mode 100644
new mode 100755
index 1d63850..8f5654e
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
@@ -18,6 +18,6 @@ package streamer;
public class Order {
- public Object type;
+ public Object type;
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
old mode 100644
new mode 100755
index d1aa5ce..104b8c3
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -20,134 +20,136 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import streamer.debug.FakeSource;
+
public class OutputStreamSink extends BaseElement {
- protected OutputStream os;
- protected SocketWrapper socketWrapper;
-
- public OutputStreamSink(String id) {
- super(id);
- }
-
- public OutputStreamSink(String id, OutputStream os) {
- super(id);
- this.os = os;
- }
-
- public OutputStreamSink(String id, SocketWrapper socketWrapper) {
- super(id);
- this.socketWrapper = socketWrapper;
- }
-
- public void setOutputStream(OutputStream os) {
- this.os = os;
- // Resume links
- resumeLinks();
- }
-
- /**
- * Send incoming data to stream.
- */
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- if (buf == null)
- return;
-
- try {
- if (verbose)
- System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
-
- os.write(buf.data, buf.offset, buf.length);
- os.flush();
- } catch (IOException e) {
- System.err.println("[" + this + "] ERROR: " + e.getMessage());
- closeStream();
+ protected OutputStream os;
+ protected SocketWrapperImpl socketWrapper;
+
+ public OutputStreamSink(String id) {
+ super(id);
+ }
+
+ public OutputStreamSink(String id, OutputStream os) {
+ super(id);
+ this.os = os;
+ }
+
+ public OutputStreamSink(String id, SocketWrapperImpl socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
}
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- switch (event) {
- case SOCKET_UPGRADE_TO_SSL:
- socketWrapper.upgradeToSsl();
- break;
- default:
- super.handleEvent(event, direction);
+
+ public void setOutputStream(OutputStream os) {
+ this.os = os;
+ // Resume links
+ resumeLinks();
+ }
+
+ /**
+ * Send incoming data to stream.
+ */
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ try {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+ os.write(buf.data, buf.offset, buf.length);
+ os.flush();
+ } catch (IOException e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
}
- }
-
- @Override
- public void setLink(String padName, Link link, Direction direction) {
- switch (direction) {
- case IN:
- super.setLink(padName, link, direction);
-
- if (os == null)
- // Pause links until data stream will be ready
- link.pause();
- break;
- case OUT:
- throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case IN:
+ super.setLink(padName, link, direction);
+
+ if (os == null)
+ // Pause links until data stream will be ready
+ link.pause();
+ break;
+ case OUT:
+ throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
}
- }
- private void resumeLinks() {
- for (DataSource source : inputPads.values())
- ((Link) source).resume();
- }
+ private void resumeLinks() {
+ for (DataSource source : inputPads.values())
+ ((Link)source).resume();
+ }
- @Override
- protected void onClose() {
- closeStream();
- }
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
- private void closeStream() {
- if (verbose)
- System.out.println("[" + this + "] INFO: Closing stream.");
+ private void closeStream() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ os.close();
+ } catch (IOException e) {
+ }
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ }
- try {
- os.close();
- } catch (IOException e) {
+ @Override
+ public String toString() {
+ return "OutputStreamSink(" + id + ")";
}
- try {
- sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
- } catch (Exception e) {
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ Element source = new FakeSource("source") {
+ {
+ this.verbose = true;
+ this.numBuffers = 3;
+ this.incommingBufLength = 5;
+ this.delay = 100;
+ }
+ };
+
+ OutputStreamSink sink = new OutputStreamSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ source.setLink(STDOUT, link, Direction.OUT);
+ sink.setLink(STDIN, link, Direction.IN);
+
+ sink.setOutputStream(new ByteArrayOutputStream());
+
+ link.sendEvent(Event.STREAM_START, Direction.IN);
+ link.run();
+
}
- }
-
- @Override
- public String toString() {
- return "OutputStreamSink(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
- Element source = new FakeSource("source") {
- {
- this.verbose = true;
- this.numBuffers = 3;
- this.incommingBufLength = 5;
- this.delay = 100;
- }
- };
-
- OutputStreamSink sink = new OutputStreamSink("sink") {
- {
- verbose = true;
- }
- };
-
- Link link = new SyncLink();
-
- source.setLink(STDOUT, link, Direction.OUT);
- sink.setLink(STDIN, link, Direction.IN);
-
- sink.setOutputStream(new ByteArrayOutputStream());
-
- link.sendEvent(Event.STREAM_START, Direction.IN);
- link.run();
-
- }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
old mode 100644
new mode 100755
index c369350..ad9d8d6
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -21,71 +21,73 @@ package streamer;
*/
public interface Pipeline extends Element {
- static final String IN = Direction.IN.toString();
- static final String OUT = Direction.OUT.toString();
+ static final String IN = Direction.IN.toString();
+ static final String OUT = Direction.OUT.toString();
- /**
- * Add elements to pipeline.
- *
- * @param elements
- */
- void add(Element... elements);
+ /**
+ * Add elements to pipeline.
+ *
+ * @param elements
+ */
+ void add(Element... elements);
- /**
- * Add elements to pipeline and link them in given order.
- *
- * @param elements
- */
- void addAndLink(Element... elements);
+ /**
+ * Add elements to pipeline and link them in given order.
+ *
+ * @param elements
+ */
+ void addAndLink(Element... elements);
- /**
- * Link elements in given order using SyncLink. Element name can have prefix
- * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
- * "stdin" and "stdout". I.e. <code>link("foo", "bar", "baz");</code> is equal
- * to <code>link("foo >stdin", "stdout< bar >stdin", "stdout< baz");</code> .
- *
- * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
- * so when pipeline will be connected with other elements, outside of this
- * pipeline, they will be connected to IN and OUT elements.
- *
- * Example:
- *
- * <pre>
- * pipeline.link("IN", "foo", "bar", "OUT");
- * // Make additional branch from foo to baz, and then to OUT
- * pipeline.link("foo >baz_out", "baz", "baz_in< OUT");
- * </pre>
- *
- * @param elements
- * elements to link
- */
- void link(String... elements);
+ /**
+ * Link elements in given order using SyncLink. Element name can have prefix
+ * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
+ * "stdin" and "stdout". I.e. <code>link("foo", "bar", "baz");</code> is equal
+ * to <code>link("foo >stdin", "stdout< bar >stdin", "stdout< baz");</code> .
+ *
+ * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
+ * so when pipeline will be connected with other elements, outside of this
+ * pipeline, they will be connected to IN and OUT elements.
+ *
+ * Empty names are skipped.
+ *
+ * Example:
+ *
+ * <pre>
+ * pipeline.link("IN", "foo", "bar", "OUT");
+ * // Make additional branch from foo to baz, and then to OUT
+ * pipeline.link("foo >baz_out", "baz", "baz_in< OUT");
+ * </pre>
+ *
+ * @param elements
+ * elements to link
+ */
+ void link(String... elements);
- /**
- * Get element by name.
- *
- * @return an element
- */
- Element get(String elementName);
+ /**
+ * Get element by name.
+ *
+ * @return an element
+ */
+ Element get(String elementName);
- /**
- * Get link by element name and pad name.
- */
- Link getLink(String elementName, String padName);
+ /**
+ * Get link by element name and pad name.
+ */
+ Link getLink(String elementName, String padName);
- /**
- * Set link by element name and pad name. Allows to link external elements
- * into internal elements of pipeline. Special elements "IN" and "OUT" are
- * pointing to pipeline outer interfaces.
- */
- void setLink(String elementName, String padName, Link link, Direction direction);
+ /**
+ * Set link by element name and pad name. Allows to link external elements
+ * into internal elements of pipeline. Special elements "IN" and "OUT" are
+ * pointing to pipeline outer interfaces.
+ */
+ void setLink(String elementName, String padName, Link link, Direction direction);
- /**
- * Get link connected to given pad in given element and run it main loop.
- * @param separateThread
- * set to true to start main loop in separate thread.
- * @param waitForStartEvent TODO
- */
- void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
+ /**
+ * Get link connected to given pad in given element and run it main loop.
+ * @param separateThread
+ * set to true to start main loop in separate thread.
+ * @param waitForStartEvent TODO
+ */
+ void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
old mode 100644
new mode 100755
index abf132f..299d0a4
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -16,294 +16,325 @@
// under the License.
package streamer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
public class PipelineImpl implements Pipeline {
- protected String id;
- protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
+ protected String id;
+ protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
- public PipelineImpl(String id) {
- this.id = id;
- elements = initElementMap(id);
- }
+ public PipelineImpl(String id) {
+ this.id = id;
+ elements = initElementMap(id);
+ }
- protected Map<String, Element> elements;
+ protected Map<String, Element> elements;
- protected HashMap<String, Element> initElementMap(String id) {
- HashMap<String, Element> map = new HashMap<String, Element>();
+ protected HashMap<String, Element> initElementMap(String id) {
+ HashMap<String, Element> map = new HashMap<String, Element>();
- map.put(IN, new BaseElement(id + "." + IN));
- map.put(OUT, new BaseElement(id + "." + OUT));
- return map;
- }
+ map.put(IN, new BaseElement(id + "." + IN));
+ map.put(OUT, new BaseElement(id + "." + OUT));
+ return map;
+ }
- @Override
- public Link getLink(String padName) {
- Link link = elements.get(IN).getLink(padName);
- if (link == null)
- link = elements.get(OUT).getLink(padName);
- return link;
- }
+ @Override
+ public Link getLink(String padName) {
+ Link link = elements.get(IN).getLink(padName);
+ if (link == null)
+ link = elements.get(OUT).getLink(padName);
+ return link;
+ }
- @Override
- public Set<String> getPads(Direction direction) {
- switch (direction) {
- case IN:
- return elements.get(IN).getPads(direction);
+ @Override
+ public Set<String> getPads(Direction direction) {
+ switch (direction) {
+ case IN:
+ return elements.get(IN).getPads(direction);
- case OUT:
- return elements.get(OUT).getPads(direction);
+ case OUT:
+ return elements.get(OUT).getPads(direction);
+ }
+ return null;
}
- return null;
- }
-
- @Override
- public void validate() {
- for (Element element : elements.values())
- element.validate();
-
- // Check IN element
- {
- Element element = get(IN);
- int outPadsNumber = element.getPads(Direction.OUT).size();
- int inPadsNumber = element.getPads(Direction.IN).size();
- if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
- throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
- + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+
+ @Override
+ public void validate() {
+ for (Element element : elements.values())
+ element.validate();
+
+ // Check IN element
+ {
+ Element element = get(IN);
+ int outPadsNumber = element.getPads(Direction.OUT).size();
+ int inPadsNumber = element.getPads(Direction.IN).size();
+ if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+ throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ }
+
+ // Check OUT element
+ {
+ Element element = get(OUT);
+ int outPadsNumber = element.getPads(Direction.OUT).size();
+ int inPadsNumber = element.getPads(Direction.IN).size();
+ if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+ throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ }
+
}
- // Check OUT element
- {
- Element element = get(OUT);
- int outPadsNumber = element.getPads(Direction.OUT).size();
- int inPadsNumber = element.getPads(Direction.IN).size();
- if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
- throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
- + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ @Override
+ public void dropLink(String padName) {
+ if (elements.get(IN).getLink(padName) != null)
+ elements.get(IN).dropLink(padName);
+
+ if (elements.get(OUT).getLink(padName) != null)
+ elements.get(OUT).dropLink(padName);
}
- }
-
- @Override
- public void dropLink(String padName) {
- if (elements.get(IN).getLink(padName) != null)
- elements.get(IN).dropLink(padName);
-
- if (elements.get(OUT).getLink(padName) != null)
- elements.get(OUT).dropLink(padName);
- }
-
- @Override
- public void dropLink(Link link) {
- elements.get(IN).dropLink(link);
- elements.get(OUT).dropLink(link);
- }
-
- @Override
- public void replaceLink(Link existingLink, Link newLink) {
- elements.get(IN).replaceLink(existingLink, newLink);
- elements.get(OUT).replaceLink(existingLink, newLink);
- }
-
- @Override
- public void setLink(String padName, Link link, Direction direction) {
- // Wire links to internal elements instead
- elements.get(direction.toString()).setLink(padName, link, direction);
- }
-
- @Override
- public void poll(boolean block) {
- throw new RuntimeException("Not implemented.");
- }
-
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- get(IN).handleData(buf, link);
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- switch (direction) {
- case IN:
- get(IN).handleEvent(event, direction);
- break;
- case OUT:
- get(OUT).handleEvent(event, direction);
- break;
+ @Override
+ public void dropLink(Link link) {
+ elements.get(IN).dropLink(link);
+ elements.get(OUT).dropLink(link);
}
- }
- @Override
- public void add(Element... elements) {
- for (Element element : elements) {
- String id = element.getId();
+ @Override
+ public void replaceLink(Link existingLink, Link newLink) {
+ elements.get(IN).replaceLink(existingLink, newLink);
+ elements.get(OUT).replaceLink(existingLink, newLink);
+ }
- if (this.elements.containsKey(id))
- throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
- + this.elements.get(id) + ".");
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ // Wire links to internal elements instead
+ elements.get(direction.toString()).setLink(padName, link, direction);
+ }
- this.elements.put(id, element);
+ @Override
+ public void poll(boolean block) {
+ throw new RuntimeException("Not implemented.");
}
- }
-
- @Override
- public void link(String... elementNames) {
-
- if (elementNames.length < 2)
- throw new RuntimeException("At least two elements are necessary to create link between them.");
-
- // Parse array of element and pad names
-
- Element elements[] = new Element[elementNames.length];
- String inputPads[] = new String[elementNames.length];
- String outputPads[] = new String[elementNames.length];
-
- int i = 0;
- for (String elementName : elementNames) {
- if (elementName.contains("< ")) {
- inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
- elementName = elementName.substring(elementName.indexOf("< ") + 2);
- } else {
- inputPads[i] = STDIN;
- }
-
- if (elementName.contains(" >")) {
- outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
- elementName = elementName.substring(0, elementName.indexOf(" >"));
- } else {
- outputPads[i] = STDOUT;
- }
-
- elements[i] = get(elementName);
-
- if (elements[i] == null)
- throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
- + this + ".");
-
- i++;
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ get(IN).handleData(buf, link);
}
- // Link elements
- for (i = 0; i < elements.length - 1; i++) {
- Element leftElement = elements[i];
- Element rightElement = elements[i + 1];
- String leftPad = outputPads[i];
- String rightPad = inputPads[i + 1];
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (direction) {
+ case IN:
+ get(IN).handleEvent(event, direction);
+ break;
+ case OUT:
+ get(OUT).handleEvent(event, direction);
+ break;
+ }
+ }
- String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+ @Override
+ public void add(Element... elements) {
+ for (Element element : elements) {
+ String id = element.getId();
- if (verbose)
- System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+ if (this.elements.containsKey(id))
+ throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+ + this.elements.get(id) + ".");
- Link link = new SyncLink(linkId);
- leftElement.setLink(leftPad, link, Direction.OUT);
- rightElement.setLink(rightPad, link, Direction.IN);
+ this.elements.put(id, element);
+ }
}
- }
- @Override
- public void addAndLink(Element... elements) {
- add(elements);
- link(elements);
- }
+ @Override
+ public void link(String... elementNames) {
+
+ elementNames = filterOutEmptyStrings(elementNames);
+
+ if (elementNames.length < 2)
+ throw new RuntimeException("At least two elements are necessary to create link between them.");
+
+ // Parse array of element and pad names
+
+ Element elements[] = new Element[elementNames.length];
+ String inputPads[] = new String[elementNames.length];
+ String outputPads[] = new String[elementNames.length];
+
+ int i = 0;
+ for (String elementName : elementNames) {
+ if (elementName.contains("< ")) {
+ inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
+ elementName = elementName.substring(elementName.indexOf("< ") + 2);
+ } else {
+ inputPads[i] = STDIN;
+ }
+
+ if (elementName.contains(" >")) {
+ outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
+ elementName = elementName.substring(0, elementName.indexOf(" >"));
+ } else {
+ outputPads[i] = STDOUT;
+ }
+
+ elements[i] = get(elementName);
+
+ if (elements[i] == null)
+ throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+ + this + ".");
- private void link(Element... elements) {
- String elementNames[] = new String[elements.length];
+ i++;
+ }
- int i = 0;
- for (Element element : elements) {
- elementNames[i++] = element.getId();
+ // Link elements
+ for (i = 0; i < elements.length - 1; i++) {
+ Element leftElement = elements[i];
+ Element rightElement = elements[i + 1];
+ String leftPad = outputPads[i];
+ String rightPad = inputPads[i + 1];
+
+ String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+
+ Link link = new SyncLink(linkId);
+ leftElement.setLink(leftPad, link, Direction.OUT);
+ rightElement.setLink(rightPad, link, Direction.IN);
+ }
+ }
+
+ /**
+ * Filter out empty strings from array and return new array with non-empty
+ * elements only. If array contains no empty string, returns same array.
+ */
+ private String[] filterOutEmptyStrings(String[] strings) {
+
+ boolean found = false;
+ for (String string : strings) {
+ if (string == null || string.isEmpty()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ return strings;
+
+ List<String> filteredStrings = new ArrayList<String>(strings.length);
+ for (String string : strings)
+ if (string != null && !string.isEmpty())
+ filteredStrings.add(string);
+ return filteredStrings.toArray(new String[filteredStrings.size()]);
+ }
+
+ @Override
+ public void addAndLink(Element... elements) {
+ add(elements);
+ link(elements);
+ }
+
+ private void link(Element... elements) {
+ String elementNames[] = new String[elements.length];
+
+ int i = 0;
+ for (Element element : elements) {
+ elementNames[i++] = element.getId();
+ }
+
+ link(elementNames);
}
- link(elementNames);
- }
+ @Override
+ public Element get(String elementName) {
+ return elements.get(elementName);
+ }
- @Override
- public Element get(String elementName) {
- return elements.get(elementName);
- }
+ @Override
+ public Link getLink(String elementName, String padName) {
+ return elements.get(elementName).getLink(padName);
- @Override
- public Link getLink(String elementName, String padName) {
- return elements.get(elementName).getLink(padName);
+ }
- }
+ @Override
+ public void setLink(String elementName, String padName, Link link, Direction direction) {
+ elements.get(elementName).setLink(padName, link, direction);
+ }
- @Override
- public void setLink(String elementName, String padName, Link link, Direction direction) {
- elements.get(elementName).setLink(padName, link, direction);
- }
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
+ validate();
- @Override
- public String getId() {
- return id;
- }
+ Link link = getLink(elementName, padName);
- @Override
- public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
- validate();
+ if (link == null)
+ throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
- Link link = getLink(elementName, padName);
+ if (!waitForStartEvent)
+ link.sendEvent(Event.STREAM_START, Direction.OUT);
- if (link == null)
- throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
+ if (separateThread) {
+ Thread thread = new Thread(link);
+ thread.setDaemon(true);
+ thread.start();
+ } else {
+ link.run();
+ }
+ }
- if (!waitForStartEvent)
- link.sendEvent(Event.STREAM_START, Direction.OUT);
+ @Override
+ public String toString() {
+ return "Pipeline(" + id + ")";
+ }
- if (separateThread) {
- Thread thread = new Thread(link);
- thread.setDaemon(true);
- thread.start();
- } else {
- link.run();
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ // System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("main");
+
+ // Create elements
+ pipeline.add(new FakeSource("source") {
+ {
+ incommingBufLength = 3;
+ numBuffers = 10;
+ delay = 100;
+ }
+ });
+ pipeline.add(new BaseElement("tee"));
+ pipeline.add(new FakeSink("sink") {
+ {
+ verbose = true;
+ }
+ });
+ pipeline.add(new FakeSink("sink2") {
+ {
+ verbose = true;
+ }
+ });
+
+ // Link elements
+ pipeline.link("source", "tee", "sink");
+ pipeline.link("tee >out2", "sink2");
+
+ // Run main loop
+ pipeline.runMainLoop("source", STDOUT, false, false);
}
- }
-
- @Override
- public String toString() {
- return "Pipeline(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
- // System.setProperty("streamer.Link.debug", "true");
- // System.setProperty("streamer.Element.debug", "true");
- // System.setProperty("streamer.Pipeline.debug", "true");
-
- Pipeline pipeline = new PipelineImpl("main");
-
- // Create elements
- pipeline.add(new FakeSource("source") {
- {
- this.incommingBufLength = 3;
- this.numBuffers = 10;
- this.delay = 100;
- }
- });
- pipeline.add(new BaseElement("tee"));
- pipeline.add(new FakeSink("sink") {
- {
- this.verbose = true;
- }
- });
- pipeline.add(new FakeSink("sink2") {
- {
- this.verbose = true;
- }
- });
-
- // Link elements
- pipeline.link("source", "tee", "sink");
- pipeline.link("tee >out2", "sink2");
-
- // Run main loop
- pipeline.runMainLoop("source", STDOUT, false, false);
- }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
old mode 100644
new mode 100755
index 7a17340..ed952ce
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -19,118 +19,120 @@ package streamer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
/**
* Message queue for safe transfer of packets between threads.
*/
public class Queue extends BaseElement {
- protected LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
-
- public Queue(String id) {
- super(id);
- }
-
- @SuppressWarnings("incomplete-switch")
- @Override
- public void poll(boolean block) {
- try {
- ByteBuffer buf = null;
- if (block) {
- buf = queue.take();
- } else {
- buf = queue.poll(100, TimeUnit.MILLISECONDS);
- }
-
- if (buf != null)
- pushDataToAllOuts(buf);
-
- } catch (Exception e) {
- sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
- closeQueue();
+ protected LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+
+ public Queue(String id) {
+ super(id);
+ }
+
+ @Override
+ public void poll(boolean block) {
+ try {
+ ByteBuffer buf = null;
+ if (block) {
+ buf = queue.take();
+ } else {
+ buf = queue.poll(100, TimeUnit.MILLISECONDS);
+ }
+
+ if (buf != null)
+ pushDataToAllOuts(buf);
+
+ } catch (Exception e) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ closeQueue();
+ }
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ // Put incoming data into queue
+ try {
+ queue.put(buf);
+ } catch (Exception e) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ closeQueue();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case LINK_SWITCH_TO_PULL_MODE:
+ // Do not propagate this event, because this element is boundary between
+ // threads
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ super.onClose();
+ closeQueue();
}
- }
-
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
-
- // Put incoming data into queue
- try {
- queue.put(buf);
- } catch (Exception e) {
- sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
- closeQueue();
+
+ private void closeQueue() {
+ queue.clear();
+ queue.add(null);
+ // Drop queue to indicate that upstream is closed.
+ // May produce NPE in poll().
+ queue = null;
}
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- switch (event) {
- case LINK_SWITCH_TO_PULL_MODE:
- // Do not propagate this event, because this element is boundary between
- // threads
- break;
- default:
- super.handleEvent(event, direction);
+
+ @Override
+ public String toString() {
+ return "Queue(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+
+ Element source1 = new FakeSource("source1") {
+ {
+ this.delay = 100;
+ this.numBuffers = 10;
+ this.incommingBufLength = 10;
+ }
+ };
+
+ Element source2 = new FakeSource("source2") {
+ {
+ this.delay = 100;
+ this.numBuffers = 10;
+ this.incommingBufLength = 10;
+ }
+ };
+
+ Pipeline pipeline = new PipelineImpl("test");
+ pipeline.add(source1);
+ pipeline.add(source2);
+ pipeline.add(new Queue("queue"));
+ pipeline.add(new FakeSink("sink"));
+
+ // Main flow
+ pipeline.link("source1", "in1< queue");
+ pipeline.link("source2", "in2< queue");
+ pipeline.link("queue", "sink");
+
+ new Thread(pipeline.getLink("source1", STDOUT)).start();
+ new Thread(pipeline.getLink("source2", STDOUT)).start();
+ pipeline.getLink("sink", STDIN).run();
}
- }
-
- @Override
- protected void onClose() {
- super.onClose();
- closeQueue();
- }
-
- private void closeQueue() {
- queue.clear();
- queue.add(null);
- // Drop queue to indicate that upstream is closed.
- // May produce NPE in poll().
- queue = null;
- }
-
- @Override
- public String toString() {
- return "Queue(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
- // System.setProperty("streamer.Link.debug", "true");
- System.setProperty("streamer.Element.debug", "true");
-
- Element source1 = new FakeSource("source1") {
- {
- this.delay = 100;
- this.numBuffers = 10;
- this.incommingBufLength = 10;
- }
- };
-
- Element source2 = new FakeSource("source2") {
- {
- this.delay = 100;
- this.numBuffers = 10;
- this.incommingBufLength = 10;
- }
- };
-
- Pipeline pipeline = new PipelineImpl("test");
- pipeline.add(source1);
- pipeline.add(source2);
- pipeline.add(new Queue("queue"));
- pipeline.add(new FakeSink("sink"));
-
- // Main flow
- pipeline.link("source1", "in1< queue");
- pipeline.link("source2", "in2< queue");
- pipeline.link("queue", "sink");
-
- new Thread(pipeline.getLink("source1", STDOUT)).start();
- new Thread(pipeline.getLink("source2", STDOUT)).start();
- pipeline.getLink("sink", STDIN).run();
- }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
old mode 100644
new mode 100755
index c23edd8..22d436e
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
@@ -16,224 +16,20 @@
// under the License.
package streamer;
-import static rdpclient.MockServer.Packet.PacketType.CLIENT;
-import static rdpclient.MockServer.Packet.PacketType.SERVER;
-
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-
-import rdpclient.MockServer;
-import rdpclient.MockServer.Packet;
-import rdpclient.TrustAllX509TrustManager;
-
-public class SocketWrapper extends PipelineImpl {
-
- protected InputStreamSource source;
- protected OutputStreamSink sink;
- protected Socket socket;
- protected InetSocketAddress address;
-
- protected SSLSocket sslSocket;
-
- //protected String SSL_VERSION_TO_USE = "TLSv1.2";
- /*DEBUG*/protected String SSL_VERSION_TO_USE = "TLSv1";
-
- public SocketWrapper(String id) {
- super(id);
- }
-
- @Override
- protected HashMap<String, Element> initElementMap(String id) {
- HashMap<String, Element> map = new HashMap<String, Element>();
-
- source = new InputStreamSource(id + "." + OUT, this);
- sink = new OutputStreamSink(id + "." + IN, this);
-
- // Pass requests to read data to socket input stream
- map.put(OUT, source);
-
- // All incoming data, which is sent to this socket wrapper, will be sent
- // to socket remote
- map.put(IN, sink);
-
- return map;
- }
-
- /**
- * Connect this socket wrapper to remote server and start main loop on
- * IputStreamSource stdout link, to watch for incoming data, and
- * OutputStreamSink stdin link, to pull for outgoing data.
- *
- * @param address
- * @throws IOException
- */
- public void connect(InetSocketAddress address) throws IOException {
- this.address = address;
-
- // Connect socket to server
- socket = SocketFactory.getDefault().createSocket();
- try {
- socket.connect(address);
-
- InputStream is = socket.getInputStream();
- source.setInputStream(is);
-
- OutputStream os = socket.getOutputStream();
- sink.setOutputStream(os);
-
- // Start polling for data to send to remote sever
- runMainLoop(IN, STDIN, true, true);
-
- // Push incoming data from server to handlers
- runMainLoop(OUT, STDOUT, false, false);
-
- } finally {
- socket.close();
- }
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- switch (event) {
- case SOCKET_UPGRADE_TO_SSL:
- upgradeToSsl();
- break;
- default:
- super.handleEvent(event, direction);
- break;
- }
- }
-
- public void upgradeToSsl() {
-
- if (sslSocket != null)
- // Already upgraded
- return;
-
- if (verbose)
- System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
-
- try {
- // Use most secure implementation of SSL available now.
- // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
- // TLS1.2 is not supported.
- SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
-
- // Trust all certificates (FIXME: insecure)
- sslContext.init(null, new TrustManager[] { new TrustAllX509TrustManager() }, null);
-
- SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
- sslSocket = (SSLSocket) sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
- sslSocket.startHandshake();
-
- InputStream sis = sslSocket.getInputStream();
- source.setInputStream(sis);
-
- OutputStream sos = sslSocket.getOutputStream();
- sink.setOutputStream(sos);
-
- } catch (Exception e) {
- throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
- }
-
- }
-
- @Override
- public void validate() {
- for (Element element : elements.values())
- element.validate();
-
- if (get(IN).getPads(Direction.IN).size() == 0)
- throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
-
- if (get(OUT).getPads(Direction.OUT).size() == 0)
- throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
-
- }
-
- public void shutdown() {
- try {
- handleEvent(Event.STREAM_CLOSE, Direction.IN);
- } catch (Exception e) {
- }
- try {
- handleEvent(Event.STREAM_CLOSE, Direction.OUT);
- } catch (Exception e) {
- }
- try {
- if (sslSocket != null)
- sslSocket.close();
- } catch (Exception e) {
- }
- try {
- socket.close();
- } catch (Exception e) {
- }
- }
-
- @Override
- public String toString() {
- return "SocketWrapper(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
- try {
- System.setProperty("streamer.Link.debug", "true");
- System.setProperty("streamer.Element.debug", "true");
- System.setProperty("rdpclient.MockServer.debug", "true");
-
- Pipeline pipeline = new PipelineImpl("echo client");
-
- SocketWrapper socketWrapper = new SocketWrapper("socket");
-
- pipeline.add(socketWrapper);
- pipeline.add(new BaseElement("echo"));
- pipeline.link("socket", "echo", "socket");
+public interface SocketWrapper extends Element {
- final byte[] mockData = new byte[] { 0x01, 0x02, 0x03 };
- MockServer server = new MockServer(new Packet[] { new Packet("Server hello") {
- {
- type = SERVER;
- data = mockData;
- }
- }, new Packet("Client hello") {
- {
- type = CLIENT;
- data = mockData;
- }
- }, new Packet("Server hello") {
- {
- type = SERVER;
- data = mockData;
- }
- }, new Packet("Client hello") {
- {
- type = CLIENT;
- data = mockData;
- }
- } });
- server.start();
- InetSocketAddress address = server.getAddress();
+ /**
+ * Connect this socket wrapper to remote server and start main loop on
+ * Source stdout link, to watch for incoming data, and
+ * Sink stdin link, to pull for outgoing data.
+ */
+ public void connect(InetSocketAddress address) throws IOException;
- socketWrapper.connect(address);
+ public void shutdown();
- } catch (IOException e) {
- e.printStackTrace(System.err);
- }
+ void upgradeToSsl();
- }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/4f3611f9/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
new file mode 100755
index 0000000..07b3dc9
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import streamer.ssl.TrustAllX509TrustManager;
+
+public class SocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+ protected InputStreamSource source;
+ protected OutputStreamSink sink;
+ protected Socket socket;
+ protected InetSocketAddress address;
+
+ protected SSLSocket sslSocket;
+
+ protected String SSL_VERSION_TO_USE = "TLSv1.2";
+ //protected String SSL_VERSION_TO_USE = "SSLv3";
+
+ protected SSLState sslState;
+
+ public SocketWrapperImpl(String id, SSLState sslState) {
+ super(id);
+ this.sslState = sslState;
+ }
+
+ @Override
+ protected HashMap<String, Element> initElementMap(String id) {
+ HashMap<String, Element> map = new HashMap<String, Element>();
+
+ source = new InputStreamSource(id + "." + OUT, this);
+ sink = new OutputStreamSink(id + "." + IN, this);
+
+ // Pass requests to read data to socket input stream
+ map.put(OUT, source);
+
+ // All incoming data, which is sent to this socket wrapper, will be sent
+ // to socket remote
+ map.put(IN, sink);
+
+ return map;
+ }
+
+ /**
+ * Connect this socket wrapper to remote server and start main loop on
+ * IputStreamSource stdout link, to watch for incoming data, and
+ * OutputStreamSink stdin link, to pull for outgoing data.
+ *
+ * @param address
+ * @throws IOException
+ */
+ @Override
+ public void connect(InetSocketAddress address) throws IOException {
+ this.address = address;
+
+ // Connect socket to server
+ socket = SocketFactory.getDefault().createSocket();
+ try {
+ socket.connect(address);
+
+ InputStream is = socket.getInputStream();
+ source.setInputStream(is);
+
+ OutputStream os = socket.getOutputStream();
+ sink.setOutputStream(os);
+
+ // Start polling for data to send to remote sever
+ runMainLoop(IN, STDIN, true, true);
+
+ // Push incoming data from server to handlers
+ runMainLoop(OUT, STDOUT, false, false);
+
+ } finally {
+ socket.close();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public void upgradeToSsl() {
+
+ if (sslSocket != null)
+ // Already upgraded
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+ try {
+ // Use most secure implementation of SSL available now.
+ // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
+ // TLS1.2 is not supported.
+ SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
+
+ // Trust all certificates (FIXME: insecure)
+ sslContext.init(null, new TrustManager[] {new TrustAllX509TrustManager(sslState)}, null);
+
+ SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
+
+ sslSocket.startHandshake();
+
+ InputStream sis = sslSocket.getInputStream();
+ source.setInputStream(sis);
+
+ OutputStream sos = sslSocket.getOutputStream();
+ sink.setOutputStream(sos);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void validate() {
+ for (Element element : elements.values())
+ element.validate();
+
+ if (get(IN).getPads(Direction.IN).size() == 0)
+ throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
+
+ if (get(OUT).getPads(Direction.OUT).size() == 0)
+ throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
+
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ try {
+ if (sslSocket != null)
+ sslSocket.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SocketWrapper(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ try {
+ System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ System.setProperty("rdpclient.MockServer.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("echo client");
+
+ SocketWrapperImpl socketWrapper = new SocketWrapperImpl("socket", null);
+
+ pipeline.add(socketWrapper);
+ pipeline.add(new BaseElement("echo"));
+ pipeline.add(new Queue("queue")); // To decouple input and output
+
+ pipeline.link("socket", "echo", "queue", "socket");
+
+ final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+ MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }});
+ server.start();
+ InetSocketAddress address = server.getAddress();
+
+ /*DEBUG*/System.out.println("Address: " + address);
+ socketWrapper.connect(address);
+
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ }
+}