You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@guacamole.apache.org by vn...@apache.org on 2018/09/26 12:44:29 UTC

[08/19] guacamole-server git commit: GUACAMOLE-623: Send typed data to Kubernetes via the STDIN channel.

GUACAMOLE-623: Send typed data to Kubernetes via the STDIN channel.


Project: http://git-wip-us.apache.org/repos/asf/guacamole-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/guacamole-server/commit/b7c938c2
Tree: http://git-wip-us.apache.org/repos/asf/guacamole-server/tree/b7c938c2
Diff: http://git-wip-us.apache.org/repos/asf/guacamole-server/diff/b7c938c2

Branch: refs/heads/master
Commit: b7c938c239f0c61b6122c817c6ce44309d61bd1f
Parents: f35517b
Author: Michael Jumper <mj...@apache.org>
Authored: Mon Sep 10 02:16:36 2018 -0700
Committer: Michael Jumper <mj...@apache.org>
Committed: Tue Sep 25 21:30:51 2018 -0700

----------------------------------------------------------------------
 src/protocols/kubernetes/kubernetes.c | 151 ++++++++++++++++++++++++++---
 src/protocols/kubernetes/kubernetes.h |  26 ++++-
 2 files changed, 164 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/guacamole-server/blob/b7c938c2/src/protocols/kubernetes/kubernetes.c
----------------------------------------------------------------------
diff --git a/src/protocols/kubernetes/kubernetes.c b/src/protocols/kubernetes/kubernetes.c
index 62fb6ee..aadb448 100644
--- a/src/protocols/kubernetes/kubernetes.c
+++ b/src/protocols/kubernetes/kubernetes.c
@@ -78,6 +78,127 @@ static void guac_kubernetes_receive_data(guac_client* client,
 }
 
 /**
+ * Requests that the given data be sent along the given channel to the
+ * Kubernetes server when the WebSocket connection is next available for
+ * writing. If the WebSocket connection has not been available for writing for
+ * long enough that the outbound message buffer is full, the request to send
+ * this particular message will be dropped.
+ *
+ * @param client
+ *     The guac_client associated with the Kubernetes connection.
+ *
+ * @param channel
+ *     The Kubernetes channel on which to send the message,
+ *     such as GUAC_KUBERNETES_CHANNEL_STDIN.
+ *
+ * @param data
+ *     A buffer containing the data to send.
+ *
+ * @param length
+ *     The number of bytes to send.
+ */
+static void guac_kubernetes_send_message(guac_client* client,
+        int channel, const char* data, int length) {
+
+    guac_kubernetes_client* kubernetes_client =
+        (guac_kubernetes_client*) client->data;
+
+    pthread_mutex_lock(&(kubernetes_client->outbound_message_lock));
+
+    /* Add message to buffer if space is available */
+    if (kubernetes_client->outbound_messages_waiting
+            < GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES) {
+
+        /* Calculate storage position of next message */
+        int index = (kubernetes_client->outbound_messages_top
+                  + kubernetes_client->outbound_messages_waiting)
+                  % GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES;
+
+        /* Obtain pointer to message slot at calculated position */
+        guac_kubernetes_message* message =
+            &(kubernetes_client->outbound_messages[index]);
+
+        /* Copy details of message into buffer */
+        message->channel = channel;
+        memcpy(message->data, data, length);
+        message->length = length;
+
+        /* One more message is now waiting */
+        kubernetes_client->outbound_messages_waiting++;
+
+        /* Notify libwebsockets that we need a callback to send pending
+         * messages */
+        lws_callback_on_writable(kubernetes_client->wsi);
+        lws_cancel_service(kubernetes_client->context);
+
+    }
+
+    /* Warn if data has to be dropped */
+    else
+        guac_client_log(client, GUAC_LOG_WARNING, "Send buffer could not be "
+                "flushed in time to handle additional data. Outbound "
+                "message dropped.");
+
+    pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock));
+
+}
+
+/**
+ * Writes the oldest pending message within the outbound message queue,
+ * as scheduled with guac_kubernetes_send_message(), removing that message
+ * from the queue. This function MAY NOT be invoked outside the libwebsockets
+ * event callback and MUST only be invoked in the context of a
+ * LWS_CALLBACK_CLIENT_WRITEABLE event. If no messages are pending, this
+ * function has no effect.
+ *
+ * @param client
+ *     The guac_client associated with the Kubernetes connection.
+ *
+ * @return
+ *     true if messages still remain to be written within the outbound message
+ *     queue, false otherwise.
+ */
+static bool guac_kubernetes_write_pending_message(guac_client* client) {
+
+    bool messages_remain;
+    guac_kubernetes_client* kubernetes_client =
+        (guac_kubernetes_client*) client->data;
+
+    pthread_mutex_lock(&(kubernetes_client->outbound_message_lock));
+
+    /* Send one message from top of buffer */
+    if (kubernetes_client->outbound_messages_waiting > 0) {
+
+        /* Obtain pointer to message at top */
+        int top = kubernetes_client->outbound_messages_top;
+        guac_kubernetes_message* message =
+            &(kubernetes_client->outbound_messages[top]);
+
+        /* Write message including channel index */
+        lws_write(kubernetes_client->wsi,
+                ((unsigned char*) message) + LWS_PRE,
+                message->length + 1, LWS_WRITE_BINARY);
+
+        /* Advance top to next message */
+        kubernetes_client->outbound_messages_top++;
+        kubernetes_client->outbound_messages_top %=
+            GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES;
+
+        /* One less message is waiting */
+        kubernetes_client->outbound_messages_waiting--;
+
+    }
+
+    /* Record whether messages remained at time of completion */
+    messages_remain = (kubernetes_client->outbound_messages_waiting > 0);
+
+    pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock));
+
+    return messages_remain;
+
+}
+
+/**
  * Callback invoked by libwebsockets for events related to a WebSocket being
  * used for communicating with an attached Kubernetes pod.
  *
@@ -132,8 +253,14 @@ static int guac_kubernetes_lws_callback(struct lws* wsi,
             guac_kubernetes_receive_data(client, (const char*) in, length);
             break;
 
-        /* TODO: Only send data here. Request callback for writing via lws_callback_on_writable(some struct lws*) */
+        /* WebSocket is ready for writing */
         case LWS_CALLBACK_CLIENT_WRITEABLE:
+
+            /* Send any pending messages, requesting another callback if
+             * yet more messages remain */
+            if (guac_kubernetes_write_pending_message(client))
+                lws_callback_on_writable(wsi);
+
             break;
 
         /* TODO: Add configure test */
@@ -189,14 +316,15 @@ static void* guac_kubernetes_input_thread(void* data) {
     guac_kubernetes_client* kubernetes_client =
         (guac_kubernetes_client*) client->data;
 
-    char buffer[8192];
+    char buffer[GUAC_KUBERNETES_MAX_MESSAGE_SIZE];
     int bytes_read;
 
     /* Write all data read */
     while ((bytes_read = guac_terminal_read_stdin(kubernetes_client->term, buffer, sizeof(buffer))) > 0) {
 
-        /* TODO: Send to Kubernetes */
-        guac_terminal_write(kubernetes_client->term, buffer, bytes_read);
+        /* Send received data to Kubernetes along STDIN channel */
+        guac_kubernetes_send_message(client, GUAC_KUBERNETES_CHANNEL_STDIN,
+                buffer, bytes_read);
 
     }
 
@@ -206,8 +334,6 @@ static void* guac_kubernetes_input_thread(void* data) {
 
 void* guac_kubernetes_client_thread(void* data) {
 
-    struct lws_context* context = NULL;
-
     guac_client* client = (guac_client*) data;
     guac_kubernetes_client* kubernetes_client =
         (guac_kubernetes_client*) client->data;
@@ -297,15 +423,15 @@ void* guac_kubernetes_client_thread(void* data) {
     }
 
     /* Create libwebsockets context */
-    context = lws_create_context(&context_info);
-    if (!context) {
+    kubernetes_client->context = lws_create_context(&context_info);
+    if (!kubernetes_client->context) {
         guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR,
                 "Initialization of libwebsockets failed");
         goto fail;
     }
 
     /* FIXME: Generate path dynamically */
-    connection_info.context = context;
+    connection_info.context = kubernetes_client->context;
     connection_info.path = "/api/v1/namespaces/default/pods/my-shell-68974bb7f7-rpjgr/attach?container=my-shell&stdin=true&stdout=true&tty=true";
 
     /* Open WebSocket connection to Kubernetes */
@@ -329,7 +455,8 @@ void* guac_kubernetes_client_thread(void* data) {
     while (client->state == GUAC_CLIENT_RUNNING) {
 
         /* Cease polling libwebsockets if an error condition is signalled */
-        if (lws_service(context, 1000) < 0)
+        if (lws_service(kubernetes_client->context,
+                    GUAC_KUBERNETES_SERVICE_INTERVAL) < 0)
             break;
 
     }
@@ -350,8 +477,8 @@ fail:
         guac_common_recording_free(kubernetes_client->recording);
 
     /* Free WebSocket context if successfully allocated */
-    if (context != NULL)
-        lws_context_destroy(context);
+    if (kubernetes_client->context != NULL)
+        lws_context_destroy(kubernetes_client->context);
 
     guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended.");
     return NULL;

http://git-wip-us.apache.org/repos/asf/guacamole-server/blob/b7c938c2/src/protocols/kubernetes/kubernetes.h
----------------------------------------------------------------------
diff --git a/src/protocols/kubernetes/kubernetes.h b/src/protocols/kubernetes/kubernetes.h
index 8fb917d..761a897 100644
--- a/src/protocols/kubernetes/kubernetes.h
+++ b/src/protocols/kubernetes/kubernetes.h
@@ -56,6 +56,13 @@
 #define GUAC_KUBERNETES_CHANNEL_RESIZE 4
 
 /**
+ * The maximum amount of data to include in any particular WebSocket message
+ * to Kubernetes. This excludes the storage space required for the channel
+ * index.
+ */
+#define GUAC_KUBERNETES_MAX_MESSAGE_SIZE 1024
+
+/**
  * The maximum number of messages to allow within the outbound message buffer.
  * If messages are sent despite the buffer being full, those messages will be
  * dropped.
@@ -63,11 +70,23 @@
 #define GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES 8
 
 /**
+ * The maximum number of milliseconds to wait for a libwebsockets event to
+ * occur before entering another iteration of the libwebsockets event loop.
+ */
+#define GUAC_KUBERNETES_SERVICE_INTERVAL 1000
+
+/**
  * An outbound message to be received by Kubernetes over WebSocket.
  */
 typedef struct guac_kubernetes_message {
 
     /**
+     * lws_write() requires leading padding of LWS_PRE bytes to provide
+     * scratch space for WebSocket framing.
+     */
+    uint8_t _padding[LWS_PRE];
+
+    /**
      * The index of the channel receiving the data, such as
      * GUAC_KUBERNETES_CHANNEL_STDIN.
      */
@@ -77,7 +96,7 @@ typedef struct guac_kubernetes_message {
      * The data that should be sent to Kubernetes (along with the channel
      * index).
      */
-    char data[1024];
+    char data[GUAC_KUBERNETES_MAX_MESSAGE_SIZE];
 
     /**
      * The length of the data to be sent, excluding the channel index.
@@ -97,6 +116,11 @@ typedef struct guac_kubernetes_client {
     guac_kubernetes_settings* settings;
 
     /**
+     * The libwebsockets context associated with the connected WebSocket.
+     */
+    struct lws_context* context;
+
+    /**
      * The connected WebSocket.
      */
     struct lws* wsi;