You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by hu...@apache.org on 2014/01/03 11:17:47 UTC
[14/50] [abbrv] CLOUDSTACK-5344: Updated to allow rdp console to
access hyper-v vm virtual framebuffer.
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
old mode 100644
new mode 100755
index d489b18..e78e301
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Element.java
@@ -19,7 +19,7 @@ package streamer;
import java.util.Set;
/**
- * Element is for processing of data. It has one or more contact pads, which can
+ * Element is basic building block for constructing data processing pipes. It has one or more contact pads, which can
* be wired with other elements using links.
*/
public interface Element {
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
old mode 100644
new mode 100755
index 9808f62..998274c
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
@@ -17,7 +17,8 @@
package streamer;
public enum Event {
- STREAM_START, STREAM_CLOSE,
+ STREAM_START,
+ STREAM_CLOSE,
/**
* Upgrade socket to SSL.
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
deleted file mode 100644
index e373b19..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class FakeSink extends BaseElement {
-
- public FakeSink(String id) {
- super(id);
- }
-
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
-
- if (buf == null)
- return;
-
- // Use packetNumber variable to count incoming packets
- packetNumber++;
-
- buf.unref();
- }
-
- @Override
- public String toString() {
- return "FakeSink(" + id + ")";
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Event received: " + event + ".");
-
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
-
- Element sink = new FakeSink("sink") {
- {
- verbose = true;
- }
- };
-
- byte[] data = new byte[] {1, 2, 3};
- ByteBuffer buf = new ByteBuffer(data);
- sink.setLink(STDIN, new SyncLink(), Direction.IN);
- sink.getLink(STDIN).sendData(buf);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
deleted file mode 100644
index 9056852..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
+++ /dev/null
@@ -1,125 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class FakeSource extends BaseElement {
-
- /**
- * Delay for null packets in poll method when blocking is requested, in
- * milliseconds.
- */
- protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
-
- public FakeSource(String id) {
- super(id);
- }
-
- @Override
- public void poll(boolean block) {
- if (numBuffers > 0 && packetNumber >= numBuffers) {
- // Close stream when limit of packets is reached
- sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
- return;
- }
-
- // Prepare new packet
- ByteBuffer buf = initializeData();
-
- // Push it to output(s)
- pushDataToAllOuts(buf);
-
- // Make slight delay when blocking input was requested (to avoid
- // consuming of 100% in parent loop)
- if (block)
- delay();
-
- }
-
- /**
- * Make slight delay. Should be used when blocking input is requested in pull
- * mode, but null packed was returned by input.
- */
- protected void delay() {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- }
- }
-
- /**
- * Initialize data.
- */
- public ByteBuffer initializeData() {
- ByteBuffer buf = new ByteBuffer(incommingBufLength);
-
- // Set first byte of package to it sequance number
- buf.data[buf.offset] = (byte)(packetNumber % 128);
-
- // Initialize rest of bytes with sequential values, which are
- // corresponding with their position in byte buffer
- for (int i = buf.offset + 1; i < buf.length; i++)
- buf.data[i] = (byte)(i % 128);
-
- buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
- buf.putMetadata("src", id);
-
- return buf;
- }
-
- @Override
- public String toString() {
- return "FakeSource(" + id + ")";
- }
-
- public static void main(String args[]) {
-
- Element fakeSource = new FakeSource("source 3/10/100") {
- {
- verbose = true;
- this.incommingBufLength = 3;
- this.numBuffers = 10;
- this.delay = 100;
- }
- };
-
- Element fakeSink = new FakeSink("sink") {
- {
- this.verbose = true;
- }
- };
-
- Element fakeSink2 = new FakeSink("sink2") {
- {
- this.verbose = true;
- }
- };
-
- Link link = new SyncLink();
-
- fakeSource.setLink(STDOUT, link, Direction.OUT);
- fakeSink.setLink(STDIN, link, Direction.IN);
-
- Link link2 = new SyncLink();
-
- fakeSource.setLink("out2", link2, Direction.OUT);
- fakeSink2.setLink(STDIN, link2, Direction.IN);
-
- link.run();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
old mode 100644
new mode 100755
index 45155e1..0c8c97d
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
@@ -20,13 +20,15 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import streamer.debug.FakeSink;
+
/**
* Source element, which reads data from InputStream.
*/
public class InputStreamSource extends BaseElement {
protected InputStream is;
- protected SocketWrapper socketWrapper;
+ protected SocketWrapperImpl socketWrapper;
public InputStreamSource(String id) {
super(id);
@@ -37,7 +39,7 @@ public class InputStreamSource extends BaseElement {
this.is = is;
}
- public InputStreamSource(String id, SocketWrapper socketWrapper) {
+ public InputStreamSource(String id, SocketWrapperImpl socketWrapper) {
super(id);
this.socketWrapper = socketWrapper;
}
@@ -45,27 +47,27 @@ public class InputStreamSource extends BaseElement {
@Override
public void handleEvent(Event event, Direction direction) {
switch (event) {
- case SOCKET_UPGRADE_TO_SSL:
- socketWrapper.upgradeToSsl();
- break;
- default:
- super.handleEvent(event, direction);
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
}
}
@Override
public void setLink(String padName, Link link, Direction direction) {
switch (direction) {
- case OUT:
- super.setLink(padName, link, direction);
-
- if (is == null) {
- // Pause links until data stream will be ready
- link.pause();
- }
- break;
- case IN:
- throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ case OUT:
+ super.setLink(padName, link, direction);
+
+ if (is == null) {
+ // Pause links until data stream will be ready
+ link.pause();
+ }
+ break;
+ case IN:
+ throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
old mode 100644
new mode 100755
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
deleted file mode 100644
index 8094c82..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-import java.util.Arrays;
-
-/**
- * Compare incoming packets with expected packets.
- */
-public class MockSink extends BaseElement {
-
- protected ByteBuffer bufs[] = null;
-
- public MockSink(String id) {
- super(id);
- }
-
- public MockSink(String id, ByteBuffer bufs[]) {
- super(id);
- this.bufs = bufs;
- }
-
- @Override
- public void handleData(ByteBuffer buf, Link link) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
-
- if (buf == null)
- return;
-
- if (packetNumber >= bufs.length)
- throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length +
- ", unexpected buffer: " + buf + ".");
-
- // Compare incoming buffer with expected buffer
- if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
- throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n Actual bufer: " + buf +
- ",\n expected buffer: " + bufs[packetNumber] + ".");
-
- if (verbose)
- System.out.println("[" + this + "] INFO: buffers are equal.");
-
- // Use packetNumber variable to count incoming packets
- packetNumber++;
-
- buf.unref();
- }
-
- @Override
- protected void onClose() {
- super.onClose();
-
- if (packetNumber != bufs.length)
- throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
- }
-
- @Override
- public String toString() {
- return "MockSink(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
-
- Element mockSource = new MockSource("source") {
- {
- this.bufs =
- new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}), new ByteBuffer(new byte[] {3, 1, 2, 3}),
- new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
- this.verbose = true;
- this.delay = 100;
- this.numBuffers = this.bufs.length;
- }
- };
-
- Element mockSink = new MockSink("sink") {
- {
- this.bufs =
- new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}), new ByteBuffer(new byte[] {3, 1, 2, 3}),
- new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
- this.verbose = true;
- }
- };
-
- Link link = new SyncLink() {
- {
- this.verbose = true;
- }
- };
-
- mockSource.setLink(STDOUT, link, Direction.OUT);
- mockSink.setLink(STDIN, link, Direction.IN);
-
- link.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
deleted file mode 100644
index f758bab..0000000
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package streamer;
-
-public class MockSource extends FakeSource {
-
- protected ByteBuffer bufs[] = null;
-
- public MockSource(String id) {
- super(id);
- }
-
- public MockSource(String id, ByteBuffer bufs[]) {
- super(id);
- this.bufs = bufs;
- }
-
- /**
- * Initialize data.
- */
- @Override
- public ByteBuffer initializeData() {
- if (packetNumber >= bufs.length) {
- sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
- return null;
- }
-
- ByteBuffer buf = bufs[packetNumber];
-
- buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
- return buf;
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- if (verbose)
- System.out.println("[" + this + "] INFO: Event received: " + event + ".");
-
- }
-
- @Override
- public String toString() {
- return "MockSource(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
-
- Element mockSource = new MockSource("source") {
- {
- this.bufs =
- new ByteBuffer[] {new ByteBuffer(new byte[] {1, 1, 2, 3, 4, 5}), new ByteBuffer(new byte[] {2, 1, 2, 3, 4}), new ByteBuffer(new byte[] {3, 1, 2, 3}),
- new ByteBuffer(new byte[] {4, 1, 2}), new ByteBuffer(new byte[] {5, 1})};
- this.verbose = true;
- this.delay = 100;
- // this.numBuffers = this.bufs.length;
- }
- };
-
- Element fakeSink = new FakeSink("sink") {
- {
- this.verbose = true;
- }
- };
-
- Link link = new SyncLink();
-
- mockSource.setLink(STDOUT, link, Direction.OUT);
- fakeSink.setLink(STDIN, link, Direction.IN);
-
- link.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
old mode 100644
new mode 100755
index d50995e..9cccb5b
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -87,13 +87,16 @@ public abstract class OneTimeSwitch extends BaseElement {
// Wake up next peer(s)
sendEventToAllPads(Event.STREAM_START, Direction.OUT);
+ // Disconnect our stdin from this element
stdin.setSink(null);
inputPads.remove(STDIN);
+ // Replace next peer stdin (our stdout) by our stdin
Element nextPeer = stdout.getSink();
nextPeer.replaceLink(stdout, stdin);
stdout.drop();
+ // Drop all other links
for (Object link : inputPads.values().toArray())
((Link)link).drop();
for (Object link : outputPads.values().toArray())
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
old mode 100644
new mode 100755
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
old mode 100644
new mode 100755
index 7a55ea8..e66899d
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -20,10 +20,12 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import streamer.debug.FakeSource;
+
public class OutputStreamSink extends BaseElement {
protected OutputStream os;
- protected SocketWrapper socketWrapper;
+ protected SocketWrapperImpl socketWrapper;
public OutputStreamSink(String id) {
super(id);
@@ -34,7 +36,7 @@ public class OutputStreamSink extends BaseElement {
this.os = os;
}
- public OutputStreamSink(String id, SocketWrapper socketWrapper) {
+ public OutputStreamSink(String id, SocketWrapperImpl socketWrapper) {
super(id);
this.socketWrapper = socketWrapper;
}
@@ -68,26 +70,26 @@ public class OutputStreamSink extends BaseElement {
@Override
public void handleEvent(Event event, Direction direction) {
switch (event) {
- case SOCKET_UPGRADE_TO_SSL:
- socketWrapper.upgradeToSsl();
- break;
- default:
- super.handleEvent(event, direction);
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
}
}
@Override
public void setLink(String padName, Link link, Direction direction) {
switch (direction) {
- case IN:
- super.setLink(padName, link, direction);
-
- if (os == null)
- // Pause links until data stream will be ready
- link.pause();
- break;
- case OUT:
- throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ case IN:
+ super.setLink(padName, link, direction);
+
+ if (os == null)
+ // Pause links until data stream will be ready
+ link.pause();
+ break;
+ case OUT:
+ throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
}
}
@@ -126,10 +128,10 @@ public class OutputStreamSink extends BaseElement {
public static void main(String args[]) {
Element source = new FakeSource("source") {
{
- this.verbose = true;
- this.numBuffers = 3;
- this.incommingBufLength = 5;
- this.delay = 100;
+ verbose = true;
+ numBuffers = 3;
+ incommingBufLength = 5;
+ delay = 100;
}
};
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
old mode 100644
new mode 100755
index d2e52dd..0f6089b
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -48,6 +48,8 @@ public interface Pipeline extends Element {
* so when pipeline will be connected with other elements, outside of this
* pipeline, they will be connected to IN and OUT elements.
*
+ * Empty names are skipped.
+ *
* Example:
*
* <pre>
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
old mode 100644
new mode 100755
index 4e6fc0a..299d0a4
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -16,10 +16,15 @@
// under the License.
package streamer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
public class PipelineImpl implements Pipeline {
protected String id;
@@ -51,11 +56,11 @@ public class PipelineImpl implements Pipeline {
@Override
public Set<String> getPads(Direction direction) {
switch (direction) {
- case IN:
- return elements.get(IN).getPads(direction);
+ case IN:
+ return elements.get(IN).getPads(direction);
- case OUT:
- return elements.get(OUT).getPads(direction);
+ case OUT:
+ return elements.get(OUT).getPads(direction);
}
return null;
}
@@ -71,8 +76,8 @@ public class PipelineImpl implements Pipeline {
int outPadsNumber = element.getPads(Direction.OUT).size();
int inPadsNumber = element.getPads(Direction.IN).size();
if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
- throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: " +
- element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
}
// Check OUT element
@@ -81,8 +86,8 @@ public class PipelineImpl implements Pipeline {
int outPadsNumber = element.getPads(Direction.OUT).size();
int inPadsNumber = element.getPads(Direction.IN).size();
if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
- throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: " +
- element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
}
}
@@ -127,12 +132,12 @@ public class PipelineImpl implements Pipeline {
@Override
public void handleEvent(Event event, Direction direction) {
switch (direction) {
- case IN:
- get(IN).handleEvent(event, direction);
- break;
- case OUT:
- get(OUT).handleEvent(event, direction);
- break;
+ case IN:
+ get(IN).handleEvent(event, direction);
+ break;
+ case OUT:
+ get(OUT).handleEvent(event, direction);
+ break;
}
}
@@ -142,8 +147,8 @@ public class PipelineImpl implements Pipeline {
String id = element.getId();
if (this.elements.containsKey(id))
- throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: " +
- this.elements.get(id) + ".");
+ throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+ + this.elements.get(id) + ".");
this.elements.put(id, element);
}
@@ -152,6 +157,8 @@ public class PipelineImpl implements Pipeline {
@Override
public void link(String... elementNames) {
+ elementNames = filterOutEmptyStrings(elementNames);
+
if (elementNames.length < 2)
throw new RuntimeException("At least two elements are necessary to create link between them.");
@@ -180,8 +187,8 @@ public class PipelineImpl implements Pipeline {
elements[i] = get(elementName);
if (elements[i] == null)
- throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: " +
- this + ".");
+ throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+ + this + ".");
i++;
}
@@ -204,6 +211,30 @@ public class PipelineImpl implements Pipeline {
}
}
+ /**
+ * Filter out empty strings from array and return new array with non-empty
+ * elements only. If array contains no empty string, returns same array.
+ */
+ private String[] filterOutEmptyStrings(String[] strings) {
+
+ boolean found = false;
+ for (String string : strings) {
+ if (string == null || string.isEmpty()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ return strings;
+
+ List<String> filteredStrings = new ArrayList<String>(strings.length);
+ for (String string : strings)
+ if (string != null && !string.isEmpty())
+ filteredStrings.add(string);
+ return filteredStrings.toArray(new String[filteredStrings.size()]);
+ }
+
@Override
public void addAndLink(Element... elements) {
add(elements);
@@ -281,20 +312,20 @@ public class PipelineImpl implements Pipeline {
// Create elements
pipeline.add(new FakeSource("source") {
{
- this.incommingBufLength = 3;
- this.numBuffers = 10;
- this.delay = 100;
+ incommingBufLength = 3;
+ numBuffers = 10;
+ delay = 100;
}
});
pipeline.add(new BaseElement("tee"));
pipeline.add(new FakeSink("sink") {
{
- this.verbose = true;
+ verbose = true;
}
});
pipeline.add(new FakeSink("sink2") {
{
- this.verbose = true;
+ verbose = true;
}
});
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
old mode 100644
new mode 100755
index 23c57e0..910e073
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -19,6 +19,9 @@ package streamer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import streamer.debug.FakeSink;
+import streamer.debug.FakeSource;
+
/**
* Message queue for safe transfer of packets between threads.
*/
@@ -30,7 +33,6 @@ public class Queue extends BaseElement {
super(id);
}
- @SuppressWarnings("incomplete-switch")
@Override
public void poll(boolean block) {
try {
@@ -67,12 +69,12 @@ public class Queue extends BaseElement {
@Override
public void handleEvent(Event event, Direction direction) {
switch (event) {
- case LINK_SWITCH_TO_PULL_MODE:
- // Do not propagate this event, because this element is boundary between
- // threads
- break;
- default:
- super.handleEvent(event, direction);
+ case LINK_SWITCH_TO_PULL_MODE:
+ // Do not propagate this event, because this element is boundary between
+ // threads
+ break;
+ default:
+ super.handleEvent(event, direction);
}
}
@@ -104,17 +106,17 @@ public class Queue extends BaseElement {
Element source1 = new FakeSource("source1") {
{
- this.delay = 100;
- this.numBuffers = 10;
- this.incommingBufLength = 10;
+ delay = 100;
+ numBuffers = 10;
+ incommingBufLength = 10;
}
};
Element source2 = new FakeSource("source2") {
{
- this.delay = 100;
- this.numBuffers = 10;
- this.incommingBufLength = 10;
+ delay = 100;
+ numBuffers = 10;
+ incommingBufLength = 10;
}
};
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
old mode 100644
new mode 100755
index dfffd35..22d436e
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
@@ -16,224 +16,20 @@
// under the License.
package streamer;
-import static rdpclient.MockServer.Packet.PacketType.CLIENT;
-import static rdpclient.MockServer.Packet.PacketType.SERVER;
-
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-
-import rdpclient.MockServer;
-import rdpclient.MockServer.Packet;
-import rdpclient.TrustAllX509TrustManager;
-
-public class SocketWrapper extends PipelineImpl {
-
- protected InputStreamSource source;
- protected OutputStreamSink sink;
- protected Socket socket;
- protected InetSocketAddress address;
-
- protected SSLSocket sslSocket;
-
- //protected String SSL_VERSION_TO_USE = "TLSv1.2";
- /*DEBUG*/protected static final String SSL_VERSION_TO_USE = "TLSv1";
-
- public SocketWrapper(String id) {
- super(id);
- }
-
- @Override
- protected HashMap<String, Element> initElementMap(String id) {
- HashMap<String, Element> map = new HashMap<String, Element>();
-
- source = new InputStreamSource(id + "." + OUT, this);
- sink = new OutputStreamSink(id + "." + IN, this);
-
- // Pass requests to read data to socket input stream
- map.put(OUT, source);
- // All incoming data, which is sent to this socket wrapper, will be sent
- // to socket remote
- map.put(IN, sink);
-
- return map;
- }
+public interface SocketWrapper extends Element {
/**
* Connect this socket wrapper to remote server and start main loop on
- * IputStreamSource stdout link, to watch for incoming data, and
- * OutputStreamSink stdin link, to pull for outgoing data.
- *
- * @param address
- * @throws IOException
+ * Source stdout link, to watch for incoming data, and
+ * Sink stdin link, to pull for outgoing data.
*/
- public void connect(InetSocketAddress address) throws IOException {
- this.address = address;
-
- // Connect socket to server
- socket = SocketFactory.getDefault().createSocket();
- try {
- socket.connect(address);
-
- InputStream is = socket.getInputStream();
- source.setInputStream(is);
-
- OutputStream os = socket.getOutputStream();
- sink.setOutputStream(os);
-
- // Start polling for data to send to remote sever
- runMainLoop(IN, STDIN, true, true);
-
- // Push incoming data from server to handlers
- runMainLoop(OUT, STDOUT, false, false);
-
- } finally {
- socket.close();
- }
- }
-
- @Override
- public void handleEvent(Event event, Direction direction) {
- switch (event) {
- case SOCKET_UPGRADE_TO_SSL:
- upgradeToSsl();
- break;
- default:
- super.handleEvent(event, direction);
- break;
- }
- }
-
- public void upgradeToSsl() {
-
- if (sslSocket != null)
- // Already upgraded
- return;
-
- if (verbose)
- System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
-
- try {
- // Use most secure implementation of SSL available now.
- // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
- // TLS1.2 is not supported.
- SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
-
- // Trust all certificates (FIXME: insecure)
- sslContext.init(null, new TrustManager[] {new TrustAllX509TrustManager()}, null);
-
- SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
- sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
- sslSocket.startHandshake();
-
- InputStream sis = sslSocket.getInputStream();
- source.setInputStream(sis);
-
- OutputStream sos = sslSocket.getOutputStream();
- sink.setOutputStream(sos);
-
- } catch (Exception e) {
- throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
- }
-
- }
-
- @Override
- public void validate() {
- for (Element element : elements.values())
- element.validate();
-
- if (get(IN).getPads(Direction.IN).size() == 0)
- throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
-
- if (get(OUT).getPads(Direction.OUT).size() == 0)
- throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
-
- }
-
- public void shutdown() {
- try {
- handleEvent(Event.STREAM_CLOSE, Direction.IN);
- } catch (Exception e) {
- }
- try {
- handleEvent(Event.STREAM_CLOSE, Direction.OUT);
- } catch (Exception e) {
- }
- try {
- if (sslSocket != null)
- sslSocket.close();
- } catch (Exception e) {
- }
- try {
- socket.close();
- } catch (Exception e) {
- }
- }
-
- @Override
- public String toString() {
- return "SocketWrapper(" + id + ")";
- }
-
- /**
- * Example.
- */
- public static void main(String args[]) {
- try {
- System.setProperty("streamer.Link.debug", "true");
- System.setProperty("streamer.Element.debug", "true");
- System.setProperty("rdpclient.MockServer.debug", "true");
-
- Pipeline pipeline = new PipelineImpl("echo client");
-
- SocketWrapper socketWrapper = new SocketWrapper("socket");
-
- pipeline.add(socketWrapper);
- pipeline.add(new BaseElement("echo"));
-
- pipeline.link("socket", "echo", "socket");
-
- final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
- MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
- {
- type = SERVER;
- data = mockData;
- }
- }, new Packet("Client hello") {
- {
- type = CLIENT;
- data = mockData;
- }
- }, new Packet("Server hello") {
- {
- type = SERVER;
- data = mockData;
- }
- }, new Packet("Client hello") {
- {
- type = CLIENT;
- data = mockData;
- }
- }});
- server.start();
- InetSocketAddress address = server.getAddress();
+ public void connect(InetSocketAddress address) throws IOException;
- socketWrapper.connect(address);
+ public void shutdown();
- } catch (IOException e) {
- e.printStackTrace(System.err);
- }
+ void upgradeToSsl();
- }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
new file mode 100755
index 0000000..da89a0d
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapperImpl.java
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import streamer.ssl.TrustAllX509TrustManager;
+
+public class SocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+ protected InputStreamSource source;
+ protected OutputStreamSink sink;
+ protected Socket socket;
+ protected InetSocketAddress address;
+
+ protected SSLSocket sslSocket;
+
+ protected String sslVersionToUse = "TLSv1.2";
+
+ protected SSLState sslState;
+
+ public SocketWrapperImpl(String id, SSLState sslState) {
+ super(id);
+ this.sslState = sslState;
+ }
+
+ @Override
+ protected HashMap<String, Element> initElementMap(String id) {
+ HashMap<String, Element> map = new HashMap<String, Element>();
+
+ source = new InputStreamSource(id + "." + OUT, this);
+ sink = new OutputStreamSink(id + "." + IN, this);
+
+ // Pass requests to read data to socket input stream
+ map.put(OUT, source);
+
+ // All incoming data, which is sent to this socket wrapper, will be sent
+ // to socket remote
+ map.put(IN, sink);
+
+ return map;
+ }
+
+ /**
+ * Connect this socket wrapper to remote server and start main loop on
+ * IputStreamSource stdout link, to watch for incoming data, and
+ * OutputStreamSink stdin link, to pull for outgoing data.
+ *
+ * @param address
+ * @throws IOException
+ */
+ @Override
+ public void connect(InetSocketAddress address) throws IOException {
+ this.address = address;
+
+ // Connect socket to server
+ socket = SocketFactory.getDefault().createSocket();
+ try {
+ socket.connect(address);
+
+ InputStream is = socket.getInputStream();
+ source.setInputStream(is);
+
+ OutputStream os = socket.getOutputStream();
+ sink.setOutputStream(os);
+
+ // Start polling for data to send to remote sever
+ runMainLoop(IN, STDIN, true, true);
+
+ // Push incoming data from server to handlers
+ runMainLoop(OUT, STDOUT, false, false);
+
+ } finally {
+ socket.close();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public void upgradeToSsl() {
+
+ if (sslSocket != null)
+ // Already upgraded
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+ try {
+ // Use most secure implementation of SSL available now.
+ // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
+ // TLS1.2 is not supported.
+ SSLContext sslContext = SSLContext.getInstance(sslVersionToUse);
+
+ // Trust all certificates (FIXME: insecure)
+ sslContext.init(null, new TrustManager[] {new TrustAllX509TrustManager(sslState)}, null);
+
+ SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ sslSocket = (SSLSocket)sslSocketFactory.createSocket(socket, address.getHostName(), address.getPort(), true);
+
+ sslSocket.startHandshake();
+
+ InputStream sis = sslSocket.getInputStream();
+ source.setInputStream(sis);
+
+ OutputStream sos = sslSocket.getOutputStream();
+ sink.setOutputStream(sos);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void validate() {
+ for (Element element : elements.values())
+ element.validate();
+
+ if (get(IN).getPads(Direction.IN).size() == 0)
+ throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
+
+ if (get(OUT).getPads(Direction.OUT).size() == 0)
+ throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
+
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ try {
+ if (sslSocket != null)
+ sslSocket.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SocketWrapper(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ try {
+ System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ System.setProperty("rdpclient.MockServer.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("echo client");
+
+ SocketWrapperImpl socketWrapper = new SocketWrapperImpl("socket", null);
+
+ pipeline.add(socketWrapper);
+ pipeline.add(new BaseElement("echo"));
+ pipeline.add(new Queue("queue")); // To decouple input and output
+
+ pipeline.link("socket", "echo", "queue", "socket");
+
+ final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+ MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }});
+ server.start();
+ InetSocketAddress address = server.getAddress();
+
+ /*DEBUG*/System.out.println("Address: " + address);
+ socketWrapper.connect(address);
+
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
old mode 100644
new mode 100755
index 2bd4919..94281d2
--- a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
@@ -27,7 +27,7 @@ public class SyncLink implements Link {
* avoid consuming of 100% of CPU in main loop in cases when link is pauses or
* source element cannot produce data right now.
*/
- protected static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
+ public static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
/**
* Delay for null packets in poll method when blocking is requested, in
@@ -46,7 +46,8 @@ public class SyncLink implements Link {
protected String id = null;
/**
- * Buffer with data to hold because link is paused, or data is pushed back.
+ * Buffer with data to hold because link is paused, on hold, or data is pushed
+ * back from output element.
*/
protected ByteBuffer cacheBuffer = null;
@@ -62,11 +63,6 @@ public class SyncLink implements Link {
protected int packetNumber = 0;
/**
- * Set to true to hold all data in link until it will be set to false again.
- */
- protected boolean paused = false;
-
- /**
* Element to pull data from, when in pull mode.
*/
protected Element source = null;
@@ -87,12 +83,24 @@ public class SyncLink implements Link {
* Indicates that event STREAM_START is passed over this link, so main loop
* can be started to pull data from source element.
*/
- protected boolean start;
+ protected boolean started = false;
+
+ /**
+ * Set to true to hold all data in link until it will be set to false again.
+ */
+ protected boolean paused = false;
+
+ /**
+ * Used by pull() method to hold all data in this link to avoid recursion when
+ * source element is asked to push new data to it outputs.
+ */
+ protected boolean hold = false;
/**
- * Operate in pull mode.
+ * Operate in pull mode instead of default push mode. In pull mode, link will
+ * ask it source element for new data.
*/
- protected boolean pullMode;
+ protected boolean pullMode = false;
public SyncLink() {
}
@@ -139,7 +147,7 @@ public class SyncLink implements Link {
*/
@Override
public void sendData(ByteBuffer buf) {
- if (!paused && pullMode)
+ if (!hold && pullMode)
throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
if (verbose)
@@ -162,7 +170,7 @@ public class SyncLink implements Link {
// When data pushed back and length of data is less than length of full
// packet, then feed data to sink element immediately
while (cacheBuffer != null) {
- if (paused) {
+ if (paused || hold) {
if (verbose)
System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
@@ -172,8 +180,8 @@ public class SyncLink implements Link {
if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
if (verbose)
- System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: " +
- expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+ System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
+ + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
// Wait until rest of packet will be read
return;
@@ -203,43 +211,46 @@ public class SyncLink implements Link {
// Shutdown main loop (if any) when STREAM_CLOSE event is received.
switch (event) {
- case STREAM_START: {
- if (!start)
- start = true;
- else
- // Event already sent trough this link
- return;
- break;
- }
- case STREAM_CLOSE: {
- if (!shutdown)
- shutdown = true;
- else
- // Event already sent trough this link
- return;
- break;
- }
- case LINK_SWITCH_TO_PULL_MODE: {
- setPullMode();
- break;
- }
+ case STREAM_START: {
+ if (!started)
+ started = true;
+ else
+ // Event already sent trough this link
+ return;
+ break;
+ }
+ case STREAM_CLOSE: {
+ if (!shutdown)
+ shutdown = true;
+ else
+ // Event already sent trough this link
+ return;
+ break;
+ }
+ case LINK_SWITCH_TO_PULL_MODE: {
+ setPullMode();
+ break;
+ }
}
switch (direction) {
- case IN:
- source.handleEvent(event, direction);
- break;
- case OUT:
- sink.handleEvent(event, direction);
- break;
+ case IN:
+ source.handleEvent(event, direction);
+ break;
+ case OUT:
+ sink.handleEvent(event, direction);
+ break;
}
}
@Override
public ByteBuffer pull(boolean block) {
if (!pullMode)
- throw new RuntimeException("This link is not in pull mode.");
+ throw new RuntimeException("[" + this + "] ERROR: This link is not in pull mode.");
+
+ if (hold)
+ throw new RuntimeException("[" + this + "] ERROR: This link is already on hold, waiting for data to be pulled in. Circular reference?");
if (paused) {
if (verbose)
@@ -269,9 +280,12 @@ public class SyncLink implements Link {
// Pause this link, so incoming data will not be sent to sink
// immediately, then ask source element for more data
- pause();
- source.poll(block);
- resume();
+ try {
+ hold = true;
+ source.poll(block);
+ } finally {
+ hold = false;
+ }
// Can return something only when data was stored in buffer
if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
@@ -289,10 +303,11 @@ public class SyncLink implements Link {
@Override
public Element setSink(Element sink) {
if (sink != null && this.sink != null)
- throw new RuntimeException("This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: " + this.sink + ".");
+ throw new RuntimeException("[" + this + "] ERROR: This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: "
+ + this.sink + ".");
if (sink == null && cacheBuffer != null)
- throw new RuntimeException("Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
+ throw new RuntimeException("[" + this + "] ERROR: Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
this.sink = sink;
@@ -302,7 +317,8 @@ public class SyncLink implements Link {
@Override
public Element setSource(Element source) {
if (this.source != null && source != null)
- throw new RuntimeException("This link source element is already set. Link: " + this + ", new source: " + source + ", existing source: " + this.source + ".");
+ throw new RuntimeException("[" + this + "] ERROR: This link source element is already set. Link: " + this + ", new source: " + source
+ + ", existing source: " + this.source + ".");
this.source = source;
return source;
@@ -321,7 +337,7 @@ public class SyncLink implements Link {
@Override
public void pause() {
if (paused)
- throw new RuntimeException("Link is already paused.");
+ throw new RuntimeException("[" + this + "] ERROR: Link is already paused.");
paused = true;
@@ -343,7 +359,7 @@ public class SyncLink implements Link {
@Override
public void run() {
// Wait until even STREAM_START will arrive
- while (!start) {
+ while (!started) {
delay();
}
@@ -374,8 +390,7 @@ public class SyncLink implements Link {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
- e.printStackTrace(System.err);
- throw new RuntimeException("Interrupted in main loop.", e);
+ throw new RuntimeException("[" + this + "] ERROR: Interrupted in main loop.", e);
}
}
@@ -384,16 +399,16 @@ public class SyncLink implements Link {
if (verbose)
System.out.println("[" + this + "] INFO: Switching to PULL mode.");
- this.pullMode = true;
+ pullMode = true;
}
@Override
public void drop() {
if (pullMode)
- throw new RuntimeException("Cannot drop link in pull mode.");
+ throw new RuntimeException("[" + this + "] ERROR: Cannot drop link in pull mode.");
if (cacheBuffer != null)
- throw new RuntimeException("Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+ throw new RuntimeException("[" + this + "] ERROR: Cannot drop link when cache conatains data: " + cacheBuffer + ".");
source.dropLink(this);
sink.dropLink(this);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
new file mode 100755
index 0000000..edfe8db
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSink.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSource;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+public class AprSocketSink extends BaseElement {
+
+ protected AprSocketWrapperImpl socketWrapper;
+ protected Long socket;
+
+ public AprSocketSink(String id) {
+ super(id);
+ }
+
+ public AprSocketSink(String id, AprSocketWrapperImpl socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ public void setSocket(long socket) {
+ this.socket = socket;
+
+ // Resume links
+ resumeLinks();
+ }
+
+ /**
+ * Send incoming data to stream.
+ */
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ if (socketWrapper.shutdown)
+ return;
+
+ try {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+ // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+ Socket.send(socket, buf.data, buf.offset, buf.length);
+ } catch (Exception e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ case LINK_SWITCH_TO_PULL_MODE:
+ throw new RuntimeException("[" + this + "] ERROR: Unexpected event: sink recived LINK_SWITCH_TO_PULL_MODE event.");
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case IN:
+ super.setLink(padName, link, direction);
+
+ if (socket == null)
+ // Pause links until data stream will be ready
+ link.pause();
+ break;
+ case OUT:
+ throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ private void resumeLinks() {
+ for (DataSource source : inputPads.values())
+ ((Link)source).resume();
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+ if (socketWrapper.shutdown)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ socketWrapper.shutdown();
+ }
+
+ @Override
+ public String toString() {
+ return "AprSocketSink(" + id + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
new file mode 100755
index 0000000..0298349
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketSource.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.DataSink;
+import streamer.Direction;
+import streamer.Event;
+import streamer.Link;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class AprSocketSource extends BaseElement {
+
+ protected AprSocketWrapperImpl socketWrapper;
+ protected Long socket;
+
+ public AprSocketSource(String id) {
+ super(id);
+ }
+
+ public AprSocketSource(String id, AprSocketWrapperImpl socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ case LINK_SWITCH_TO_PULL_MODE:
+ // Do nothing - this is the source
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case OUT:
+ super.setLink(padName, link, direction);
+
+ if (socket == null) {
+ // Pause links until data stream will be ready
+ link.pause();
+ }
+ break;
+ case IN:
+ throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ public void setSocket(long socket) {
+ this.socket = socket;
+
+ // Resume links
+ resumeLinks();
+ }
+
+ private void resumeLinks() {
+ for (DataSink sink : outputPads.values())
+ ((Link)sink).resume();
+ }
+
+ /**
+ * Read data from input stream.
+ */
+ @Override
+ public void poll(boolean block) {
+ if (socketWrapper.shutdown) {
+ socketWrapper.destroyPull();
+ return;
+ }
+
+ try {
+ // Create buffer of recommended size and with default offset
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+ // FIXME: If pull is destroyed or socket is closed, segfault will happen here
+ int actualLength = (block) ? // Blocking read
+ Socket.recv(socket, buf.data, buf.offset, buf.data.length - buf.offset)
+ : // Non-blocking read
+ Socket.recvt(socket, buf.data, buf.offset, buf.data.length - buf.offset, 1);
+
+ if (socketWrapper.shutdown) {
+ socketWrapper.destroyPull();
+ return;
+ }
+
+ if (actualLength < 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: End of stream.");
+
+ buf.unref();
+ closeStream();
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ if (actualLength == 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+ buf.unref();
+ return;
+ }
+
+ buf.length = actualLength;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+ pushDataToAllOuts(buf);
+
+ } catch (Exception e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+
+ if (socketWrapper.shutdown)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ socketWrapper.shutdown();
+ }
+
+ @Override
+ public String toString() {
+ return "AprSocketSource(" + id + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
new file mode 100755
index 0000000..2ee426b
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/apr/AprSocketWrapperImpl.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.apr;
+
+import static streamer.debug.MockServer.Packet.PacketType.CLIENT;
+import static streamer.debug.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.tomcat.jni.Address;
+import org.apache.tomcat.jni.Error;
+import org.apache.tomcat.jni.Library;
+import org.apache.tomcat.jni.Pool;
+import org.apache.tomcat.jni.SSL;
+import org.apache.tomcat.jni.SSLContext;
+import org.apache.tomcat.jni.SSLSocket;
+import org.apache.tomcat.jni.Socket;
+
+import streamer.BaseElement;
+import streamer.Direction;
+import streamer.Element;
+import streamer.Event;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+import streamer.Queue;
+import streamer.SocketWrapper;
+import streamer.debug.MockServer;
+import streamer.debug.MockServer.Packet;
+import streamer.ssl.SSLState;
+import sun.security.x509.X509CertImpl;
+
+public class AprSocketWrapperImpl extends PipelineImpl implements SocketWrapper {
+
+ static {
+ try {
+ Library.initialize(null);
+ SSL.initialize(null);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot load Tomcat Native Library (Apache Portable Runtime).", e);
+ }
+ }
+
+ private final SSLState sslState;
+
+ final long pool = Pool.create(0);
+ long inetAddress;
+ long socket;
+ private AprSocketSource source;
+ private AprSocketSink sink;
+
+ boolean shutdown = false;
+
+ private final boolean shutdowned = false;
+
+ public AprSocketWrapperImpl(String id, SSLState sslState) {
+ super(id);
+ this.sslState = sslState;
+ }
+
+ @Override
+ protected HashMap<String, Element> initElementMap(String id) {
+ HashMap<String, Element> map = new HashMap<String, Element>();
+
+ source = new AprSocketSource(id + "." + OUT, this);
+ sink = new AprSocketSink(id + "." + IN, this);
+
+ // Pass requests to read data to socket input stream
+ map.put(OUT, source);
+
+ // All incoming data, which is sent to this socket wrapper, will be sent
+ // to socket remote
+ map.put(IN, sink);
+
+ return map;
+ }
+
+ /**
+ * Connect this socket wrapper to remote server and start main loop on
+ * AprSocketSource stdout link, to watch for incoming data, and
+ * AprSocketSink stdin link, to pull for outgoing data.
+ */
+ @Override
+ public void connect(InetSocketAddress address) throws IOException {
+ try {
+ inetAddress = Address.info(address.getHostName(), Socket.APR_UNSPEC, address.getPort(), 0, pool);
+ socket = Socket.create(Address.getInfo(inetAddress).family, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
+ } catch (Exception e) {
+ throw new IOException("[" + this + "] ERROR: Cannot create socket for \"" + address + "\".", e);
+ }
+
+ int ret = Socket.connect(socket, inetAddress);
+ if (ret != 0)
+ throw new IOException("[" + this + "] ERROR: Cannot connect to remote host \"" + address + "\": " + Error.strerror(ret));
+
+ source.setSocket(socket);
+ sink.setSocket(socket);
+
+ // Start polling for data to send to remote sever
+ runMainLoop(IN, STDIN, true, true);
+
+ // Push incoming data from server to handlers
+ runMainLoop(OUT, STDOUT, false, false);
+
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public void validate() {
+
+ if (getPads(Direction.IN).size() == 0)
+ throw new RuntimeException("[ " + this + "] BUG: Input of socket is not connected.");
+
+ if (getPads(Direction.OUT).size() == 0)
+ throw new RuntimeException("[ " + this + "] BUG: Output of socket is not connected.");
+
+ }
+
+ @Override
+ public void upgradeToSsl() {
+
+ try {
+ long sslContext;
+ try {
+ sslContext = SSLContext.make(pool, SSL.SSL_PROTOCOL_TLSV1, SSL.SSL_MODE_CLIENT);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot create SSL context using Tomcat native library.", e);
+ }
+
+ SSLContext.setOptions(sslContext, SSL.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS | SSL.SSL_OP_TLS_BLOCK_PADDING_BUG | SSL.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
+ | SSL.SSL_OP_MSIE_SSLV2_RSA_PADDING);
+ // FIXME: verify certificate by default
+ SSLContext.setVerify(sslContext, SSL.SSL_CVERIFY_NONE, 0);
+ int ret;
+ try {
+ ret = SSLSocket.attach(sslContext, socket);
+ } catch (Exception e) {
+ throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket: ", e);
+ }
+ if (ret != 0)
+ throw new RuntimeException("[" + this + "] ERROR: Cannot attach SSL context to socket(" + ret + "): " + SSL.getLastError());
+
+ try {
+ ret = SSLSocket.handshake(socket);
+ } catch (Exception e) {
+ throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server: ", e);
+ }
+ if (ret != 0 && ret != 20014) // 20014: bad certificate signature FIXME: show prompt for self signed certificate
+ throw new RuntimeException("[" + this + "] ERROR: Cannot make SSL handshake with server(" + ret + "): " + SSL.getLastError());
+
+ try {
+ byte[] key = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+ //*DEBUG*/System.out.println("DEBUG: Server cert:\n"+new ByteBuffer(key).dump());
+ sslState.serverCertificateSubjectPublicKeyInfo = new X509CertImpl(key).getPublicKey().getEncoded();
+ } catch (Exception e) {
+ throw new RuntimeException("[" + this + "] ERROR: Cannot get server public key: ", e);
+ }
+
+ } catch (RuntimeException e) {
+ shutdown();
+ throw e;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (shutdown)
+ return;
+
+ shutdown = true;
+
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ }
+
+ void destroyPull() {
+ if (shutdowned)
+ return;
+
+ // Causes segfault in AprSocketSource.poll() method, so this function must be called from it
+ try {
+ Socket.close(socket);
+ // or
+ // Socket.shutdown(socket, Socket.APR_SHUTDOWN_READWRITE);
+ Pool.destroy(pool);
+ } catch (Exception e) {
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "AprSocketWrapper(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ try {
+ System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ System.setProperty("rdpclient.MockServer.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("echo client");
+
+ AprSocketWrapperImpl socketWrapper = new AprSocketWrapperImpl("socket", null);
+
+ pipeline.add(socketWrapper);
+ pipeline.add(new BaseElement("echo"));
+ pipeline.add(new Queue("queue")); // To decouple input and output
+
+ pipeline.link("socket", "echo", "queue", "socket");
+
+ final byte[] mockData = new byte[] {0x01, 0x02, 0x03};
+ MockServer server = new MockServer(new Packet[] {new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }});
+ server.start();
+ InetSocketAddress address = server.getAddress();
+
+ socketWrapper.connect(address);
+
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/48c47101/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
----------------------------------------------------------------------
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
new file mode 100755
index 0000000..67e2dbd
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/bco/BcoSocketWrapperImpl.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer.bco;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.SecureRandom;
+import java.security.Security;
+
+import org.bouncycastle.asn1.x509.X509CertificateStructure;
+import org.bouncycastle.crypto.tls.CertificateVerifyer;
+import org.bouncycastle.crypto.tls.TlsProtocolHandler;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+import streamer.Direction;
+import streamer.Event;
+import streamer.SocketWrapperImpl;
+import streamer.ssl.SSLState;
+
+@SuppressWarnings("deprecation")
+public class BcoSocketWrapperImpl extends SocketWrapperImpl {
+
+ static {
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ private TlsProtocolHandler bcoSslSocket;
+
+ public BcoSocketWrapperImpl(String id, SSLState sslState) {
+ super(id, sslState);
+ }
+
+ @Override
+ public void upgradeToSsl() {
+
+ if (sslSocket != null)
+ // Already upgraded
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+ try {
+
+ SecureRandom secureRandom = new SecureRandom();
+ bcoSslSocket = new TlsProtocolHandler(socket.getInputStream(), socket.getOutputStream(), secureRandom);
+
+ CertificateVerifyer client = new CertificateVerifyer() {
+
+ @Override
+ public boolean isValid(X509CertificateStructure[] chain) {
+
+ try {
+ if (sslState != null) {
+ sslState.serverCertificateSubjectPublicKeyInfo = chain[0].getSubjectPublicKeyInfo().getEncoded();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot get server public key.", e);
+ }
+
+ return true;
+ }
+ };
+ bcoSslSocket.connect(client);
+
+ InputStream sis = bcoSslSocket.getInputStream();
+ source.setInputStream(sis);
+
+ OutputStream sos = bcoSslSocket.getOutputStream();
+ sink.setOutputStream(sos);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ try {
+ if (bcoSslSocket != null)
+ bcoSslSocket.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BcoSocketWrapper(" + id + ")";
+ }
+
+}