You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2017/08/01 16:09:26 UTC

openmeetings git commit: [OPENMEETINGS-1677] initial commit on cluster web sockets

Repository: openmeetings
Updated Branches:
  refs/heads/3.3.x 795cf657b -> 9ffba612e


[OPENMEETINGS-1677] initial commit on cluster web sockets


Project: http://git-wip-us.apache.org/repos/asf/openmeetings/repo
Commit: http://git-wip-us.apache.org/repos/asf/openmeetings/commit/9ffba612
Tree: http://git-wip-us.apache.org/repos/asf/openmeetings/tree/9ffba612
Diff: http://git-wip-us.apache.org/repos/asf/openmeetings/diff/9ffba612

Branch: refs/heads/3.3.x
Commit: 9ffba612e2761d161e1fd3408a1f22731e6e9fe2
Parents: 795cf65
Author: Maxim Solodovnik <so...@gmail.com>
Authored: Tue Aug 1 23:09:18 2017 +0700
Committer: Maxim Solodovnik <so...@gmail.com>
Committed: Tue Aug 1 23:09:18 2017 +0700

----------------------------------------------------------------------
 .../openmeetings/core/util/WebSocketHelper.java | 94 +++++++++++++-------
 .../openmeetings/core/util/ws/WsMessageAll.java | 34 +++++++
 .../core/util/ws/WsMessageChat.java             | 43 +++++++++
 .../core/util/ws/WsMessageRoom.java             | 42 +++++++++
 .../core/util/ws/WsMessageRoomMsg.java          | 35 ++++++++
 .../core/util/ws/WsMessageUser.java             | 40 +++++++++
 .../org/apache/openmeetings/IApplication.java   |  4 +
 .../src/site/xdoc/Clustering.xml                |  1 -
 .../openmeetings/util/ws/IClusterWsMessage.java | 24 +++++
 .../openmeetings/web/app/Application.java       | 21 +++++
 10 files changed, 307 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
index 7321a5c..681efd5 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
@@ -19,6 +19,7 @@
 package org.apache.openmeetings.core.util;
 
 import static org.apache.openmeetings.util.OpenmeetingsVariables.webAppRootKey;
+import static org.apache.openmeetings.util.OpenmeetingsVariables.wicketApplicationName;
 import static org.apache.wicket.util.string.Strings.escapeMarkup;
 
 import java.io.IOException;
@@ -31,6 +32,11 @@ import java.util.function.Predicate;
 
 import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.openmeetings.IApplication;
+import org.apache.openmeetings.core.util.ws.WsMessageAll;
+import org.apache.openmeetings.core.util.ws.WsMessageChat;
+import org.apache.openmeetings.core.util.ws.WsMessageRoom;
+import org.apache.openmeetings.core.util.ws.WsMessageRoomMsg;
+import org.apache.openmeetings.core.util.ws.WsMessageUser;
 import org.apache.openmeetings.db.entity.basic.ChatMessage;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.room.Room.Right;
@@ -38,6 +44,7 @@ import org.apache.openmeetings.db.entity.user.User;
 import org.apache.openmeetings.util.OpenmeetingsVariables;
 import org.apache.openmeetings.util.message.RoomMessage;
 import org.apache.openmeetings.util.message.TextRoomMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
 import org.apache.wicket.Application;
 import org.apache.wicket.protocol.ws.WebSocketSettings;
 import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
@@ -57,23 +64,6 @@ public class WebSocketHelper {
 	public static final String ID_ROOM_PREFIX = ID_TAB_PREFIX + "r";
 	public static final String ID_USER_PREFIX = ID_TAB_PREFIX + "u";
 
-	public static void sendClient(final Client _c, byte[] b) {
-		if (_c != null) {
-			send(a -> Arrays.asList(_c), (t, c) -> {
-				try {
-					t.sendMessage(b, 0, b.length);
-				} catch (IOException e) {
-					log.error("Error while broadcasting byte[] to room", e);
-				}
-			}, null);
-		}
-	}
-
-	public static void sendRoom(final RoomMessage m) {
-		log.debug("Sending WebSocket message: {} {}", m.getType(), m instanceof TextRoomMessage ? ((TextRoomMessage)m).getText() : "");
-		sendRoom(m.getRoomId(), (t, c) -> t.sendMessage(m), null);
-	}
-
 	private static String getName(User u) {
 		return escapeMarkup(String.format("%s %s", u.getFirstname(), u.getLastname())).toString();
 	}
@@ -116,28 +106,55 @@ public class WebSocketHelper {
 			.put("msg", arr);
 	}
 
+	public static void sendClient(final Client _c, byte[] b) {
+		if (_c != null) {
+			send(a -> Arrays.asList(_c), (t, c) -> {
+				try {
+					t.sendMessage(b, 0, b.length);
+				} catch (IOException e) {
+					log.error("Error while broadcasting byte[] to room", e);
+				}
+			}, null);
+		}
+	}
+
+	public static void send(IClusterWsMessage _m) {
+		if (_m instanceof WsMessageRoomMsg) {
+			sendRoom(((WsMessageRoomMsg)_m).getMsg());
+		} else if (_m instanceof WsMessageRoom) {
+			WsMessageRoom m = (WsMessageRoom)_m;
+			sendRoom(m.getRoomId(), m.getMsg());
+		} else if (_m instanceof WsMessageChat) {
+			WsMessageChat m = (WsMessageChat)_m;
+			sendRoom(m.getChatMessage(), m.getMsg());
+		} else if (_m instanceof WsMessageUser) {
+			WsMessageUser m = (WsMessageUser)_m;
+			sendUser(m.getUserId(), m.getMsg());
+		} else if (_m instanceof WsMessageAll) {
+			sendAll(((WsMessageAll)_m).getMsg());
+		}
+	}
+
+	public static void sendRoom(final RoomMessage m) {
+		publish(new WsMessageRoomMsg(m));
+		log.debug("Sending WebSocket message: {} {}", m.getType(), m instanceof TextRoomMessage ? ((TextRoomMessage)m).getText() : "");
+		sendRoom(m.getRoomId(), (t, c) -> t.sendMessage(m), null);
+	}
+
 	public static void sendRoom(final Long roomId, final JSONObject m) {
+		publish(new WsMessageRoom(roomId, m));
 		sendRoom(roomId, m, null, null);
 	}
 
 	public static void sendRoom(ChatMessage m, JSONObject msg) {
+		publish(new WsMessageChat(m, msg));
 		sendRoom(m.getToRoom().getId(), msg
 				, c -> !m.isNeedModeration() || (m.isNeedModeration() && c.hasRight(Right.moderator))
 				, null);
 	}
 
-	public static void sendRoom(final Long roomId, final JSONObject m, Predicate<Client> check, BiFunction<JSONObject, Client, String> func) {
-		log.debug("Sending WebSocket message: {}", m);
-		sendRoom(roomId, (t, c) -> {
-			try {
-				t.sendMessage(func == null ? m.toString() : func.apply(m, c));
-			} catch (IOException e) {
-				log.error("Error while broadcasting message to room", e);
-			}
-		}, check);
-	}
-
 	public static void sendUser(final Long userId, final String m) {
+		publish(new WsMessageUser(userId, m));
 		send(a -> ((IApplication)a).getOmClients(userId), (t, c) -> {
 			try {
 				t.sendMessage(m);
@@ -149,6 +166,7 @@ public class WebSocketHelper {
 
 	//TODO should this be unified???
 	public static void sendAll(final String m) {
+		publish(new WsMessageAll(m));
 		Application app = Application.get(OpenmeetingsVariables.wicketApplicationName);
 		WebSocketSettings settings = WebSocketSettings.Holder.get(app);
 		IWebSocketConnectionRegistry reg = settings.getConnectionRegistry();
@@ -164,11 +182,27 @@ public class WebSocketHelper {
 		}
 	}
 
-	public static void sendRoom(final Long roomId, BiConsumer<IWebSocketConnection, Client> consumer, Predicate<Client> check) {
+	private static void publish(IClusterWsMessage m) {
+		IApplication iapp = (IApplication)Application.get(wicketApplicationName);
+		iapp.publishWsTopic(m);
+	}
+
+	private static void sendRoom(final Long roomId, final JSONObject m, Predicate<Client> check, BiFunction<JSONObject, Client, String> func) {
+		log.debug("Sending WebSocket message: {}", m);
+		sendRoom(roomId, (t, c) -> {
+			try {
+				t.sendMessage(func == null ? m.toString() : func.apply(m, c));
+			} catch (IOException e) {
+				log.error("Error while broadcasting message to room", e);
+			}
+		}, check);
+	}
+
+	private static void sendRoom(final Long roomId, BiConsumer<IWebSocketConnection, Client> consumer, Predicate<Client> check) {
 		send(a -> ((IApplication)a).getOmRoomClients(roomId), consumer, check);
 	}
 
-	public static void send(
+	private static void send(
 			final Function<Application, List<Client>> func
 			, BiConsumer<IWebSocketConnection, Client> consumer
 			, Predicate<Client> check)

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java
new file mode 100644
index 0000000..438e2f0
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+public class WsMessageAll implements IClusterWsMessage {
+	private static final long serialVersionUID = 1L;
+	private final String msg;
+
+	public WsMessageAll(String msg) {
+		this.msg = msg;
+	}
+
+	public String getMsg() {
+		return msg;
+	}
+}

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java
new file mode 100644
index 0000000..debd5b4
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.db.entity.basic.ChatMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+import com.github.openjson.JSONObject;
+
+public class WsMessageChat implements IClusterWsMessage {
+	private static final long serialVersionUID = 1L;
+	private final ChatMessage m;
+	private final String msg;
+
+	public WsMessageChat(ChatMessage m, JSONObject msg) {
+		this.m = m;
+		this.msg = msg.toString();
+	}
+
+	public ChatMessage getChatMessage() {
+		return m;
+	}
+
+	public JSONObject getMsg() {
+		return new JSONObject(msg);
+	}
+}

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java
new file mode 100644
index 0000000..d947eb8
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+import com.github.openjson.JSONObject;
+
+public class WsMessageRoom implements IClusterWsMessage {
+	private static final long serialVersionUID = 1L;
+	private final Long roomId;
+	private final String msg;
+
+	public WsMessageRoom(Long roomId, JSONObject msg) {
+		this.roomId = roomId;
+		this.msg = msg.toString();
+	}
+
+	public Long getRoomId() {
+		return roomId;
+	}
+
+	public JSONObject getMsg() {
+		return new JSONObject(msg);
+	}
+}

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java
new file mode 100644
index 0000000..4f8a3d2
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.message.RoomMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+public class WsMessageRoomMsg implements IClusterWsMessage {
+	private static final long serialVersionUID = 1L;
+	private final RoomMessage msg;
+
+	public WsMessageRoomMsg(RoomMessage msg) {
+		this.msg = msg;
+	}
+
+	public RoomMessage getMsg() {
+		return msg;
+	}
+}

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java
new file mode 100644
index 0000000..4b7b32b
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+public class WsMessageUser implements IClusterWsMessage {
+	private static final long serialVersionUID = 1L;
+	private final Long userId;
+	private final String msg;
+
+	public WsMessageUser(Long userId, String msg) {
+		this.userId = userId;
+		this.msg = msg;
+	}
+
+	public Long getUserId() {
+		return userId;
+	}
+
+	public String getMsg() {
+		return msg;
+	}
+}

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
----------------------------------------------------------------------
diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java b/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
index caf3dd3..afdeb6a 100644
--- a/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
+++ b/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
@@ -30,6 +30,7 @@ import org.apache.openmeetings.db.dao.basic.ConfigurationDao;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.room.Invitation;
 import org.apache.openmeetings.db.entity.room.StreamClient;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
 import org.apache.wicket.request.IExceptionMapper;
 import org.apache.wicket.request.IRequestMapper;
 import org.apache.wicket.request.mapper.parameter.PageParameters;
@@ -64,4 +65,7 @@ public interface IApplication {
 
 	//JPA
 	void updateJpaAddresses(ConfigurationDao dao);
+
+	//WS
+	void publishWsTopic(IClusterWsMessage msg);
 }

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-server/src/site/xdoc/Clustering.xml
----------------------------------------------------------------------
diff --git a/openmeetings-server/src/site/xdoc/Clustering.xml b/openmeetings-server/src/site/xdoc/Clustering.xml
index 7fc6513..6a35d19 100644
--- a/openmeetings-server/src/site/xdoc/Clustering.xml
+++ b/openmeetings-server/src/site/xdoc/Clustering.xml
@@ -39,7 +39,6 @@
 					Schedulers to drop user sessions as outdated)
 				</li>
 				<li>All servers should be configured to use the same DB</li>
-				<li>Servers should be added in Administration -&gt; Servers section</li>
 			</ul>
 			<subsection name="Network">
 				<p>Multicast should be set up on all servers<br/>

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java
----------------------------------------------------------------------
diff --git a/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java b/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java
new file mode 100644
index 0000000..78f2dc5
--- /dev/null
+++ b/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.util.ws;
+
+import java.io.Serializable;
+
+public interface IClusterWsMessage extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
----------------------------------------------------------------------
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
index 10b5f03..7a5ec65 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
@@ -67,6 +67,7 @@ import org.apache.openmeetings.db.entity.user.User.Type;
 import org.apache.openmeetings.util.InitializationContainer;
 import org.apache.openmeetings.util.OpenmeetingsVariables;
 import org.apache.openmeetings.util.message.RoomMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
 import org.apache.openmeetings.web.pages.AccessDeniedPage;
 import org.apache.openmeetings.web.pages.ActivatePage;
 import org.apache.openmeetings.web.pages.HashPage;
@@ -128,10 +129,13 @@ import org.wicketstuff.datastores.hazelcast.HazelcastDataStore;
 import com.hazelcast.config.XmlConfigBuilder;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ITopic;
 import com.hazelcast.core.Member;
 import com.hazelcast.core.MemberAttributeEvent;
 import com.hazelcast.core.MembershipEvent;
 import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
 
 public class Application extends AuthenticatedWebApplication implements IApplication {
 	private static final Logger log = getLogger(Application.class, webAppRootKey);
@@ -155,6 +159,7 @@ public class Application extends AuthenticatedWebApplication implements IApplica
 	private String xFrameOptions = HEADER_XFRAME_SAMEORIGIN;
 	private String contentSecurityPolicy = OpenmeetingsVariables.HEADER_CSP_SELF;
 	private final HazelcastInstance hazelcast = Hazelcast.getOrCreateHazelcastInstance(new XmlConfigBuilder().build());
+	private ITopic<IClusterWsMessage> hazelWsTopic;
 
 	@Override
 	protected void init() {
@@ -163,6 +168,17 @@ public class Application extends AuthenticatedWebApplication implements IApplica
 		getApplicationSettings().setAccessDeniedPage(AccessDeniedPage.class);
 
 		hazelcast.getCluster().getLocalMember().setStringAttribute(NAME_ATTR_KEY, hazelcast.getName());
+		hazelWsTopic = hazelcast.getTopic("default");
+		hazelWsTopic.addMessageListener(new MessageListener<IClusterWsMessage>() {
+			@Override
+			public void onMessage(Message<IClusterWsMessage> msg) {
+				String serverId = msg.getPublishingMember().getStringAttribute(NAME_ATTR_KEY);
+				if (serverId.equals(hazelcast.getName())) {
+					return;
+				}
+				WebSocketHelper.send(msg.getMessageObject());
+			}
+		});
 		hazelcast.getCluster().addMembershipListener(new MembershipListener() {
 			@Override
 			public void memberRemoved(MembershipEvent evt) {
@@ -833,4 +849,9 @@ public class Application extends AuthenticatedWebApplication implements IApplica
 			throw new WicketRuntimeException(e);
 		}
 	}
+
+	@Override
+	public void publishWsTopic(IClusterWsMessage msg) {
+		hazelWsTopic.publish(msg);
+	}
 }