You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2012/12/13 23:56:08 UTC
svn commit: r1421602 - in /tomcat/trunk/java:
javax/websocket/RemoteEndpoint.java
org/apache/tomcat/websocket/WsRemoteEndpoint.java
Author: markt
Date: Thu Dec 13 22:56:06 2012
New Revision: 1421602
URL: http://svn.apache.org/viewvc?rev=1421602&view=rev
Log:
WebSocket 1.0 implementation part 15 of many
Implement enough of the send message code that that WebSocket example works again using the new annotation endpoint
Modified:
tomcat/trunk/java/javax/websocket/RemoteEndpoint.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/RemoteEndpoint.java?rev=1421602&r1=1421601&r2=1421602&view=diff
==============================================================================
--- tomcat/trunk/java/javax/websocket/RemoteEndpoint.java (original)
+++ tomcat/trunk/java/javax/websocket/RemoteEndpoint.java Thu Dec 13 22:56:06 2012
@@ -25,8 +25,18 @@ import java.util.concurrent.Future;
public interface RemoteEndpoint {
+ /**
+ * Send the message, blocking until the message is sent.
+ * @param text The text message to send.
+ * @throws IOException
+ */
void sendString(String text) throws IOException;
+ /**
+ * Send the message, blocking until the message is sent.
+ * @param data The binary message to send
+ * @throws IOException
+ */
void sendBytes(ByteBuffer data) throws IOException;
void sendPartialString(String fragment, boolean isLast) throws IOException;
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1421602&r1=1421601&r2=1421602&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Thu Dec 13 22:56:06 2012
@@ -20,6 +20,12 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import javax.servlet.ServletOutputStream;
@@ -31,38 +37,83 @@ import javax.websocket.SendResult;
public class WsRemoteEndpoint implements RemoteEndpoint {
private final ServletOutputStream sos;
+ // Max length for outgoing WebSocket frame header is 10 bytes
+ private final ByteBuffer header = ByteBuffer.allocate(10);
+
+ private final ByteBuffer textToByte = ByteBuffer.allocate(8192);
+ private final CharsetEncoder encoder = Charset.forName("UTF8").newEncoder();
+ private volatile Boolean isText = null;
+ private volatile CyclicBarrier writeBarrier = new CyclicBarrier(2);
+
public WsRemoteEndpoint(ServletOutputStream sos) {
this.sos = sos;
}
- public void onWritePossible() {
- // TODO
- }
@Override
public void sendString(String text) throws IOException {
- // TODO Auto-generated method stub
+ if (isText != null) {
+ // Another message is being sent using fragments
+ // TODO i18n
+ throw new IllegalStateException();
+ }
+ sendPartialString(text, true);
}
@Override
public void sendBytes(ByteBuffer data) throws IOException {
- // TODO Auto-generated method stub
+ if (isText != null) {
+ // Another message is being sent using fragments
+ // TODO i18n
+ throw new IllegalStateException();
+ }
+ sendPartialBytes(data, true);
}
@Override
public void sendPartialString(String fragment, boolean isLast)
throws IOException {
- // TODO Auto-generated method stub
+
+ if (isText != null && !isText.booleanValue()) {
+ // Can't write a text fragment in the middle of a binary message
+ // TODO i18n
+ throw new IllegalStateException();
+ }
+
+ boolean first = (isText == null);
+ encoder.reset();
+ textToByte.clear();
+ CharBuffer cb = CharBuffer.wrap(fragment);
+ CoderResult cr = encoder.encode(cb, textToByte, true);
+ while (cr.isOverflow()) {
+ sendMessage(Constants.OPCODE_TEXT, textToByte, first, false);
+ first = false;
+ }
+ sendMessage(Constants.OPCODE_TEXT, textToByte, first, isLast);
+ if (!isLast) {
+ isText = Boolean.FALSE;
+ }
}
@Override
public void sendPartialBytes(ByteBuffer partialByte, boolean isLast)
throws IOException {
- // TODO Auto-generated method stub
+
+ if (isText != null && isText.booleanValue()) {
+ // Can't write a binary fragment in the middle of a text message
+ // TODO i18n
+ throw new IllegalStateException();
+ }
+
+ boolean first = (isText == null);
+ sendMessage(Constants.OPCODE_BINARY, partialByte, first, isLast);
+ if (!isLast) {
+ isText = Boolean.FALSE;
+ }
}
@@ -127,12 +178,92 @@ public class WsRemoteEndpoint implements
@Override
public void sendPing(ByteBuffer applicationData) {
- // TODO Auto-generated method stub
+ sendMessage(Constants.OPCODE_PING, applicationData, true, true);
}
@Override
public void sendPong(ByteBuffer applicationData) {
- // TODO Auto-generated method stub
+ sendMessage(Constants.OPCODE_PONG, applicationData, true, true);
+ }
+
+
+ public void onWritePossible() {
+ try {
+ writeBarrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+
+ private void sendMessage(byte opCode, ByteBuffer message,
+ boolean isFirstFragment, boolean isLastFragment) {
+ // Clear header, ready for new message
+ header.clear();
+ byte first = 0;
+
+ if (isLastFragment) {
+ // Set the fin bit
+ first = -128;
+ }
+
+ if (isFirstFragment) {
+ // This is the first fragment of this message
+ first = (byte) (first + opCode);
+ }
+ // If not the first fragment, it is a continuation with opCode of zero
+
+ message.flip();
+ header.put(first);
+
+ // Next write the length
+ if (message.limit() < 126) {
+ header.put((byte) message.limit());
+ } else if (message.limit() < 65536) {
+ header.put((byte) 126);
+ header.put((byte) (message.limit() >>> 8));
+ header.put((byte) (message.limit() & 0xFF));
+ } else {
+ // Will never be more than 2^31-1
+ header.put((byte) 127);
+ header.put((byte) 0);
+ header.put((byte) 0);
+ header.put((byte) 0);
+ header.put((byte) 0);
+ header.put((byte) (message.limit() >>> 24));
+ header.put((byte) (message.limit() >>> 16));
+ header.put((byte) (message.limit() >>> 8));
+ header.put((byte) (message.limit() & 0xFF));
+ }
+ header.flip();
+
+ doBlockingWrite(header);
+ doBlockingWrite(message);
+ try {
+ sos.flush();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+
+ private void doBlockingWrite(ByteBuffer data) {
+ if (!sos.canWrite()) {
+ try {
+ writeBarrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ try {
+ sos.write(data.array(), data.arrayOffset(), data.limit());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org