You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2017/08/01 16:09:26 UTC
openmeetings git commit: [OPENMEETINGS-1677] initial commit on
cluster web sockets
Repository: openmeetings
Updated Branches:
refs/heads/3.3.x 795cf657b -> 9ffba612e
[OPENMEETINGS-1677] initial commit on cluster web sockets
Project: http://git-wip-us.apache.org/repos/asf/openmeetings/repo
Commit: http://git-wip-us.apache.org/repos/asf/openmeetings/commit/9ffba612
Tree: http://git-wip-us.apache.org/repos/asf/openmeetings/tree/9ffba612
Diff: http://git-wip-us.apache.org/repos/asf/openmeetings/diff/9ffba612
Branch: refs/heads/3.3.x
Commit: 9ffba612e2761d161e1fd3408a1f22731e6e9fe2
Parents: 795cf65
Author: Maxim Solodovnik <so...@gmail.com>
Authored: Tue Aug 1 23:09:18 2017 +0700
Committer: Maxim Solodovnik <so...@gmail.com>
Committed: Tue Aug 1 23:09:18 2017 +0700
----------------------------------------------------------------------
.../openmeetings/core/util/WebSocketHelper.java | 94 +++++++++++++-------
.../openmeetings/core/util/ws/WsMessageAll.java | 34 +++++++
.../core/util/ws/WsMessageChat.java | 43 +++++++++
.../core/util/ws/WsMessageRoom.java | 42 +++++++++
.../core/util/ws/WsMessageRoomMsg.java | 35 ++++++++
.../core/util/ws/WsMessageUser.java | 40 +++++++++
.../org/apache/openmeetings/IApplication.java | 4 +
.../src/site/xdoc/Clustering.xml | 1 -
.../openmeetings/util/ws/IClusterWsMessage.java | 24 +++++
.../openmeetings/web/app/Application.java | 21 +++++
10 files changed, 307 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
index 7321a5c..681efd5 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
@@ -19,6 +19,7 @@
package org.apache.openmeetings.core.util;
import static org.apache.openmeetings.util.OpenmeetingsVariables.webAppRootKey;
+import static org.apache.openmeetings.util.OpenmeetingsVariables.wicketApplicationName;
import static org.apache.wicket.util.string.Strings.escapeMarkup;
import java.io.IOException;
@@ -31,6 +32,11 @@ import java.util.function.Predicate;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.openmeetings.IApplication;
+import org.apache.openmeetings.core.util.ws.WsMessageAll;
+import org.apache.openmeetings.core.util.ws.WsMessageChat;
+import org.apache.openmeetings.core.util.ws.WsMessageRoom;
+import org.apache.openmeetings.core.util.ws.WsMessageRoomMsg;
+import org.apache.openmeetings.core.util.ws.WsMessageUser;
import org.apache.openmeetings.db.entity.basic.ChatMessage;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.room.Room.Right;
@@ -38,6 +44,7 @@ import org.apache.openmeetings.db.entity.user.User;
import org.apache.openmeetings.util.OpenmeetingsVariables;
import org.apache.openmeetings.util.message.RoomMessage;
import org.apache.openmeetings.util.message.TextRoomMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
import org.apache.wicket.Application;
import org.apache.wicket.protocol.ws.WebSocketSettings;
import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
@@ -57,23 +64,6 @@ public class WebSocketHelper {
public static final String ID_ROOM_PREFIX = ID_TAB_PREFIX + "r";
public static final String ID_USER_PREFIX = ID_TAB_PREFIX + "u";
- public static void sendClient(final Client _c, byte[] b) {
- if (_c != null) {
- send(a -> Arrays.asList(_c), (t, c) -> {
- try {
- t.sendMessage(b, 0, b.length);
- } catch (IOException e) {
- log.error("Error while broadcasting byte[] to room", e);
- }
- }, null);
- }
- }
-
- public static void sendRoom(final RoomMessage m) {
- log.debug("Sending WebSocket message: {} {}", m.getType(), m instanceof TextRoomMessage ? ((TextRoomMessage)m).getText() : "");
- sendRoom(m.getRoomId(), (t, c) -> t.sendMessage(m), null);
- }
-
private static String getName(User u) {
return escapeMarkup(String.format("%s %s", u.getFirstname(), u.getLastname())).toString();
}
@@ -116,28 +106,55 @@ public class WebSocketHelper {
.put("msg", arr);
}
+ public static void sendClient(final Client _c, byte[] b) {
+ if (_c != null) {
+ send(a -> Arrays.asList(_c), (t, c) -> {
+ try {
+ t.sendMessage(b, 0, b.length);
+ } catch (IOException e) {
+ log.error("Error while broadcasting byte[] to room", e);
+ }
+ }, null);
+ }
+ }
+
+ public static void send(IClusterWsMessage _m) {
+ if (_m instanceof WsMessageRoomMsg) {
+ sendRoom(((WsMessageRoomMsg)_m).getMsg());
+ } else if (_m instanceof WsMessageRoom) {
+ WsMessageRoom m = (WsMessageRoom)_m;
+ sendRoom(m.getRoomId(), m.getMsg());
+ } else if (_m instanceof WsMessageChat) {
+ WsMessageChat m = (WsMessageChat)_m;
+ sendRoom(m.getChatMessage(), m.getMsg());
+ } else if (_m instanceof WsMessageUser) {
+ WsMessageUser m = (WsMessageUser)_m;
+ sendUser(m.getUserId(), m.getMsg());
+ } else if (_m instanceof WsMessageAll) {
+ sendAll(((WsMessageAll)_m).getMsg());
+ }
+ }
+
+ public static void sendRoom(final RoomMessage m) {
+ publish(new WsMessageRoomMsg(m));
+ log.debug("Sending WebSocket message: {} {}", m.getType(), m instanceof TextRoomMessage ? ((TextRoomMessage)m).getText() : "");
+ sendRoom(m.getRoomId(), (t, c) -> t.sendMessage(m), null);
+ }
+
public static void sendRoom(final Long roomId, final JSONObject m) {
+ publish(new WsMessageRoom(roomId, m));
sendRoom(roomId, m, null, null);
}
public static void sendRoom(ChatMessage m, JSONObject msg) {
+ publish(new WsMessageChat(m, msg));
sendRoom(m.getToRoom().getId(), msg
, c -> !m.isNeedModeration() || (m.isNeedModeration() && c.hasRight(Right.moderator))
, null);
}
- public static void sendRoom(final Long roomId, final JSONObject m, Predicate<Client> check, BiFunction<JSONObject, Client, String> func) {
- log.debug("Sending WebSocket message: {}", m);
- sendRoom(roomId, (t, c) -> {
- try {
- t.sendMessage(func == null ? m.toString() : func.apply(m, c));
- } catch (IOException e) {
- log.error("Error while broadcasting message to room", e);
- }
- }, check);
- }
-
public static void sendUser(final Long userId, final String m) {
+ publish(new WsMessageUser(userId, m));
send(a -> ((IApplication)a).getOmClients(userId), (t, c) -> {
try {
t.sendMessage(m);
@@ -149,6 +166,7 @@ public class WebSocketHelper {
//TODO should this be unified???
public static void sendAll(final String m) {
+ publish(new WsMessageAll(m));
Application app = Application.get(OpenmeetingsVariables.wicketApplicationName);
WebSocketSettings settings = WebSocketSettings.Holder.get(app);
IWebSocketConnectionRegistry reg = settings.getConnectionRegistry();
@@ -164,11 +182,27 @@ public class WebSocketHelper {
}
}
- public static void sendRoom(final Long roomId, BiConsumer<IWebSocketConnection, Client> consumer, Predicate<Client> check) {
+ private static void publish(IClusterWsMessage m) {
+ IApplication iapp = (IApplication)Application.get(wicketApplicationName);
+ iapp.publishWsTopic(m);
+ }
+
+ private static void sendRoom(final Long roomId, final JSONObject m, Predicate<Client> check, BiFunction<JSONObject, Client, String> func) {
+ log.debug("Sending WebSocket message: {}", m);
+ sendRoom(roomId, (t, c) -> {
+ try {
+ t.sendMessage(func == null ? m.toString() : func.apply(m, c));
+ } catch (IOException e) {
+ log.error("Error while broadcasting message to room", e);
+ }
+ }, check);
+ }
+
+ private static void sendRoom(final Long roomId, BiConsumer<IWebSocketConnection, Client> consumer, Predicate<Client> check) {
send(a -> ((IApplication)a).getOmRoomClients(roomId), consumer, check);
}
- public static void send(
+ private static void send(
final Function<Application, List<Client>> func
, BiConsumer<IWebSocketConnection, Client> consumer
, Predicate<Client> check)
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java
new file mode 100644
index 0000000..438e2f0
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageAll.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+public class WsMessageAll implements IClusterWsMessage {
+ private static final long serialVersionUID = 1L;
+ private final String msg;
+
+ public WsMessageAll(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java
new file mode 100644
index 0000000..debd5b4
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageChat.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.db.entity.basic.ChatMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+import com.github.openjson.JSONObject;
+
+public class WsMessageChat implements IClusterWsMessage {
+ private static final long serialVersionUID = 1L;
+ private final ChatMessage m;
+ private final String msg;
+
+ public WsMessageChat(ChatMessage m, JSONObject msg) {
+ this.m = m;
+ this.msg = msg.toString();
+ }
+
+ public ChatMessage getChatMessage() {
+ return m;
+ }
+
+ public JSONObject getMsg() {
+ return new JSONObject(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java
new file mode 100644
index 0000000..d947eb8
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoom.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+import com.github.openjson.JSONObject;
+
+public class WsMessageRoom implements IClusterWsMessage {
+ private static final long serialVersionUID = 1L;
+ private final Long roomId;
+ private final String msg;
+
+ public WsMessageRoom(Long roomId, JSONObject msg) {
+ this.roomId = roomId;
+ this.msg = msg.toString();
+ }
+
+ public Long getRoomId() {
+ return roomId;
+ }
+
+ public JSONObject getMsg() {
+ return new JSONObject(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java
new file mode 100644
index 0000000..4f8a3d2
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomMsg.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.message.RoomMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+public class WsMessageRoomMsg implements IClusterWsMessage {
+ private static final long serialVersionUID = 1L;
+ private final RoomMessage msg;
+
+ public WsMessageRoomMsg(RoomMessage msg) {
+ this.msg = msg;
+ }
+
+ public RoomMessage getMsg() {
+ return msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java
----------------------------------------------------------------------
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java
new file mode 100644
index 0000000..4b7b32b
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageUser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
+
+public class WsMessageUser implements IClusterWsMessage {
+ private static final long serialVersionUID = 1L;
+ private final Long userId;
+ private final String msg;
+
+ public WsMessageUser(Long userId, String msg) {
+ this.userId = userId;
+ this.msg = msg;
+ }
+
+ public Long getUserId() {
+ return userId;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
----------------------------------------------------------------------
diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java b/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
index caf3dd3..afdeb6a 100644
--- a/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
+++ b/openmeetings-db/src/main/java/org/apache/openmeetings/IApplication.java
@@ -30,6 +30,7 @@ import org.apache.openmeetings.db.dao.basic.ConfigurationDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.room.Invitation;
import org.apache.openmeetings.db.entity.room.StreamClient;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
import org.apache.wicket.request.IExceptionMapper;
import org.apache.wicket.request.IRequestMapper;
import org.apache.wicket.request.mapper.parameter.PageParameters;
@@ -64,4 +65,7 @@ public interface IApplication {
//JPA
void updateJpaAddresses(ConfigurationDao dao);
+
+ //WS
+ void publishWsTopic(IClusterWsMessage msg);
}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-server/src/site/xdoc/Clustering.xml
----------------------------------------------------------------------
diff --git a/openmeetings-server/src/site/xdoc/Clustering.xml b/openmeetings-server/src/site/xdoc/Clustering.xml
index 7fc6513..6a35d19 100644
--- a/openmeetings-server/src/site/xdoc/Clustering.xml
+++ b/openmeetings-server/src/site/xdoc/Clustering.xml
@@ -39,7 +39,6 @@
Schedulers to drop user sessions as outdated)
</li>
<li>All servers should be configured to use the same DB</li>
- <li>Servers should be added in Administration -> Servers section</li>
</ul>
<subsection name="Network">
<p>Multicast should be set up on all servers<br/>
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java
----------------------------------------------------------------------
diff --git a/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java b/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java
new file mode 100644
index 0000000..78f2dc5
--- /dev/null
+++ b/openmeetings-util/src/main/java/org/apache/openmeetings/util/ws/IClusterWsMessage.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.util.ws;
+
+import java.io.Serializable;
+
+public interface IClusterWsMessage extends Serializable {
+}
http://git-wip-us.apache.org/repos/asf/openmeetings/blob/9ffba612/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
----------------------------------------------------------------------
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
index 10b5f03..7a5ec65 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/Application.java
@@ -67,6 +67,7 @@ import org.apache.openmeetings.db.entity.user.User.Type;
import org.apache.openmeetings.util.InitializationContainer;
import org.apache.openmeetings.util.OpenmeetingsVariables;
import org.apache.openmeetings.util.message.RoomMessage;
+import org.apache.openmeetings.util.ws.IClusterWsMessage;
import org.apache.openmeetings.web.pages.AccessDeniedPage;
import org.apache.openmeetings.web.pages.ActivatePage;
import org.apache.openmeetings.web.pages.HashPage;
@@ -128,10 +129,13 @@ import org.wicketstuff.datastores.hazelcast.HazelcastDataStore;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
public class Application extends AuthenticatedWebApplication implements IApplication {
private static final Logger log = getLogger(Application.class, webAppRootKey);
@@ -155,6 +159,7 @@ public class Application extends AuthenticatedWebApplication implements IApplica
private String xFrameOptions = HEADER_XFRAME_SAMEORIGIN;
private String contentSecurityPolicy = OpenmeetingsVariables.HEADER_CSP_SELF;
private final HazelcastInstance hazelcast = Hazelcast.getOrCreateHazelcastInstance(new XmlConfigBuilder().build());
+ private ITopic<IClusterWsMessage> hazelWsTopic;
@Override
protected void init() {
@@ -163,6 +168,17 @@ public class Application extends AuthenticatedWebApplication implements IApplica
getApplicationSettings().setAccessDeniedPage(AccessDeniedPage.class);
hazelcast.getCluster().getLocalMember().setStringAttribute(NAME_ATTR_KEY, hazelcast.getName());
+ hazelWsTopic = hazelcast.getTopic("default");
+ hazelWsTopic.addMessageListener(new MessageListener<IClusterWsMessage>() {
+ @Override
+ public void onMessage(Message<IClusterWsMessage> msg) {
+ String serverId = msg.getPublishingMember().getStringAttribute(NAME_ATTR_KEY);
+ if (serverId.equals(hazelcast.getName())) {
+ return;
+ }
+ WebSocketHelper.send(msg.getMessageObject());
+ }
+ });
hazelcast.getCluster().addMembershipListener(new MembershipListener() {
@Override
public void memberRemoved(MembershipEvent evt) {
@@ -833,4 +849,9 @@ public class Application extends AuthenticatedWebApplication implements IApplica
throw new WicketRuntimeException(e);
}
}
+
+ @Override
+ public void publishWsTopic(IClusterWsMessage msg) {
+ hazelWsTopic.publish(msg);
+ }
}