You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2018/09/23 11:52:47 UTC
[openmeetings] branch master updated: [OPENMEETINGS-1649] room
video somehow work (not stable)
This is an automated email from the ASF dual-hosted git repository.
solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push:
new 267da92 [OPENMEETINGS-1649] room video somehow work (not stable)
267da92 is described below
commit 267da9245a65326f519c889b974b4c6967bcdfde
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Sun Sep 23 18:52:32 2018 +0700
[OPENMEETINGS-1649] room video somehow work (not stable)
---
.../org/apache/openmeetings/core/remote/KRoom.java | 42 +-----
.../apache/openmeetings/core/remote/KStream.java | 158 +++++++++------------
.../openmeetings/core/remote/KTestStream.java | 3 +-
.../openmeetings/core/remote/KurentoHandler.java | 127 ++++++++++++-----
.../openmeetings/core/util/WebSocketHelper.java | 15 ++
.../core/util/ws/WsMessageRoomOthers.java | 35 +++++
.../openmeetings/db/util/ws/RoomMessage.java | 2 -
.../apache/openmeetings/web/room/RoomPanel.java | 43 +-----
.../openmeetings/web/room/raw-video-manager.js | 53 +++++--
.../org/apache/openmeetings/web/room/raw-video.js | 11 --
.../webapp/WEB-INF/classes/applicationContext.xml | 5 +-
11 files changed, 252 insertions(+), 242 deletions(-)
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index f06e141..d4b1d77 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -24,9 +24,7 @@ package org.apache.openmeetings.core.remote;
import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
import java.io.Closeable;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -66,53 +64,22 @@ public class KRoom implements Closeable {
log.info("ROOM {} has been created", roomId);
}
- public KStream addStream(final KurentoHandler h, StreamDesc sd) {
+ public KStream startBroadcast(final KurentoHandler h, Client c, StreamDesc sd) {
log.info("ROOM {}: adding participant {}", roomId, sd.getUid());
- final KStream u = new KStream(h, sd.getSid(), sd.getUid(), this.roomId, this.pipeline);
+ final KStream u = new KStream(h, c, this.pipeline);
participants.put(u.getUid(), u);
h.usersByUid.put(u.getUid(), u);
- return u;
- }
-
- public KStream join(final KurentoHandler h, Client c, StreamDesc sd) {
- KStream u = addStream(h, sd);
broadcast(h, c, sd);
return u;
}
- public void leave(final KurentoHandler h, KStream user) {
- log.debug("PARTICIPANT {}: Leaving room {}", user.getUid(), this.roomId);
- this.removeParticipant(h, user.getUid());
- user.release();
- }
-
- private void removeParticipant(final KurentoHandler h, String name) {
- participants.remove(name);
-
- log.debug("ROOM {}: notifying all users that {} is leaving the room", this.roomId, name);
-
- final List<String> unnotifiedParticipants = new ArrayList<>();
- final JSONObject msg = newKurentoMsg();
- msg.put("id", "participantLeft");
- msg.put("name", name);
- for (final KStream participant : participants.values()) {
- participant.cancelVideoFrom(name);
- h.sendClient(participant.getSid(), msg);
- }
-
- if (!unnotifiedParticipants.isEmpty()) {
- log.debug("ROOM {}: The users {} could not be notified that {} left the room", this.roomId,
- unnotifiedParticipants, name);
- }
- }
-
private static void broadcast(final KurentoHandler h, Client c, StreamDesc sd) {
final JSONObject msg = newKurentoMsg();
msg.put("id", "broadcast");
msg.put("uid", sd.getUid());
msg.put("stream", new JSONObject(sd));
msg.put("client", c.toJson(true));
- log.debug("User {}: has started broadcast", sd.getSid());
+ log.debug("User {}: has started broadcast", sd.getUid());
h.sendClient(sd.getSid(), msg);
}
@@ -126,11 +93,8 @@ public class KRoom implements Closeable {
for (final KStream user : participants.values()) {
user.release();
}
-
participants.clear();
-
pipeline.release(new Continuation<Void>() {
-
@Override
public void onSuccess(Void result) throws Exception {
log.trace("ROOM {}: Released Pipeline", KRoom.this.roomId);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index cef92fb..e47c6d7 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -36,6 +36,7 @@ import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.IceCandidateFoundEvent;
import org.kurento.client.MediaFlowOutStateChangeEvent;
+import org.kurento.client.MediaFlowState;
import org.kurento.client.MediaPipeline;
import org.kurento.client.WebRtcEndpoint;
import org.kurento.jsonrpc.JsonUtils;
@@ -59,25 +60,15 @@ public class KStream implements IKStream {
private final WebRtcEndpoint outgoingMedia;
private final ConcurrentMap<String, WebRtcEndpoint> incomingMedia = new ConcurrentHashMap<>();
- public KStream(final KurentoHandler h, final String sid, final String uid, Long roomId, MediaPipeline pipeline) {
+ //FIXME TODO multiple streams from client
+ public KStream(final KurentoHandler h, final Client c, MediaPipeline pipeline) {
this.pipeline = pipeline;
- this.sid = sid;
- this.uid = uid;
- this.roomId = roomId;
+ this.sid = c.getSid();
+ this.uid = c.getUid();
+ this.roomId = c.getRoomId();
//TODO Min/MaxVideoSendBandwidth
//TODO Min/Max Audio/Video RecvBandwidth
- outgoingMedia = new WebRtcEndpoint.Builder(pipeline).build();
-
- outgoingMedia.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
- @Override
- public void onEvent(IceCandidateFoundEvent event) {
- JSONObject response = newKurentoMsg();
- response.put("id", "iceCandidate");
- response.put("uid", uid);
- response.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate())));
- h.sendClient(sid, response);
- }
- });
+ outgoingMedia = createEndpoint(h, c);
//TODO add logic here
outgoingMedia.addConnectionStateChangedListener(new EventListener<ConnectionStateChangedEvent>() {
@Override
@@ -89,14 +80,16 @@ public class KStream implements IKStream {
@Override
public void onEvent(MediaFlowOutStateChangeEvent event) {
log.warn("MediaFlowOutStateChange {}", event.getState());
+ if (MediaFlowState.FLOWING == event.getState()) {
+ JSONObject msg = newKurentoMsg();
+ msg.put("id", "newStream");
+ msg.put("client", c.toJson(false));
+ WebSocketHelper.sendRoomOthers(roomId, uid, msg);
+ }
}
});
}
- public WebRtcEndpoint getOutgoingWebRtcPeer() {
- return outgoingMedia;
- }
-
public String getSid() {
return sid;
}
@@ -111,111 +104,74 @@ public class KStream implements IKStream {
* @return The room
*/
public Long getRoomId() {
- return this.roomId;
+ return roomId;
}
- public void receiveVideoFrom(final KurentoHandler h, Client c, KStream sender, String sdpOffer) {
- log.info("USER {}: connecting with {} in room {}", this.uid, sender.getUid(), this.roomId);
+ public boolean contains(String uid) {
+ return this.uid.equals(uid) || incomingMedia.containsKey(uid);
+ }
- log.trace("USER {}: SdpOffer for {} is {}", this.uid, sender.getUid(), sdpOffer);
+ public void videoResponse(final KurentoHandler h, Client c, String sdpOffer) {
+ final boolean self = c.getUid().equals(uid);
+ log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", roomId);
+ log.trace("USER {}: SdpOffer is {}", c.getUid(), sdpOffer);
- if (c.getUid().equals(sender.getUid())) {
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.newStream, c.getUid()));
- }
- final String sdpAnswer = this.getEndpointForUser(h, sender).processOffer(sdpOffer);
- final JSONObject scParams = newKurentoMsg();
- scParams.put("id", "videoResponse");
- scParams.put("uid", sender.getUid());
- scParams.put("sdpAnswer", sdpAnswer);
+ final WebRtcEndpoint endpoint = getEndpointForUser(h, c);
+ final String sdpAnswer = endpoint.processOffer(sdpOffer);
- log.trace("USER {}: SdpAnswer for {} is {}", this.uid, sender.getUid(), sdpAnswer);
- h.sendClient(sid, scParams);
+ log.trace("USER {}: SdpAnswer is {}", uid, sdpAnswer);
+ h.sendClient(c.getSid(), newKurentoMsg()
+ .put("id", "videoResponse")
+ .put("uid", uid)
+ .put("sdpAnswer", sdpAnswer));
log.debug("gather candidates");
- this.getEndpointForUser(h, sender).gatherCandidates();
+ endpoint.gatherCandidates();
+ if (self) {
+ WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+ }
}
- private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final KStream sender) {
- if (sender.getUid().equals(uid)) {
- log.debug("PARTICIPANT {}: configuring loopback", this.uid);
+ private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final Client c) {
+ if (c.getUid().equals(uid)) {
+ log.debug("PARTICIPANT {}: configuring loopback", uid);
return outgoingMedia;
}
- log.debug("PARTICIPANT {}: receiving video from {}", this.uid, sender.getUid());
+ log.debug("PARTICIPANT {}: receiving video from {}", uid, c.getUid());
- WebRtcEndpoint incoming = incomingMedia.get(sender.getUid());
+ WebRtcEndpoint incoming = incomingMedia.get(c.getUid());
if (incoming == null) {
- log.debug("PARTICIPANT {}: creating new endpoint for {}", this.uid, sender.getUid());
- incoming = new WebRtcEndpoint.Builder(pipeline).build();
-
- incoming.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
-
- @Override
- public void onEvent(IceCandidateFoundEvent event) {
- JSONObject response = newKurentoMsg();
- response.put("id", "iceCandidate");
- response.put("uid", sender.getUid());
- response.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate())));
- h.sendClient(sid, response);
- }
- });
-
- incomingMedia.put(sender.getUid(), incoming);
+ log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, c.getUid());
+ incoming = createEndpoint(h, c);
+ incomingMedia.put(c.getUid(), incoming);
}
- log.debug("PARTICIPANT {}: obtained endpoint for {}", this.uid, sender.getUid());
- sender.getOutgoingWebRtcPeer().connect(incoming);
+ log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, c.getUid());
+ outgoingMedia.connect(incoming);
return incoming;
}
- public void cancelVideoFrom(final KStream sender) {
- this.cancelVideoFrom(sender.getUid());
- }
-
- public void cancelVideoFrom(final String senderName) {
- log.debug("PARTICIPANT {}: canceling video reception from {}", this.uid, senderName);
- final WebRtcEndpoint incoming = incomingMedia.remove(senderName);
-
- log.debug("PARTICIPANT {}: removing endpoint for {}", this.uid, senderName);
- incoming.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, senderName);
- }
-
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, senderName);
- }
- });
- }
-
@Override
public void release() {
- log.debug("PARTICIPANT {}: Releasing resources", this.uid);
- for (final String remoteParticipantName : incomingMedia.keySet()) {
-
- log.trace("PARTICIPANT {}: Released incoming EP for {}", this.uid, remoteParticipantName);
+ log.debug("PARTICIPANT {}: Releasing resources", uid);
+ for (final String inUid : incomingMedia.keySet()) {
+ log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
- final WebRtcEndpoint ep = this.incomingMedia.get(remoteParticipantName);
+ final WebRtcEndpoint ep = incomingMedia.get(inUid);
ep.release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
- log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid,
- remoteParticipantName);
+ log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
}
@Override
public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid,
- remoteParticipantName);
+ log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
}
});
}
-
outgoingMedia.release(new Continuation<Void>() {
-
@Override
public void onSuccess(Void result) throws Exception {
log.trace("PARTICIPANT {}: Released outgoing EP", KStream.this.uid);
@@ -228,6 +184,24 @@ public class KStream implements IKStream {
});
}
+ private WebRtcEndpoint createEndpoint(final KurentoHandler h, final Client c) {
+ WebRtcEndpoint endpoint = new WebRtcEndpoint.Builder(pipeline).build();
+ endpoint.addTag("suid", uid);
+ endpoint.addTag("uid", c.getUid());
+
+ endpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
+ @Override
+ public void onEvent(IceCandidateFoundEvent event) {
+ JSONObject response = newKurentoMsg();
+ response.put("id", "iceCandidate");
+ response.put("uid", uid);
+ response.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate())));
+ h.sendClient(c.getSid(), response);
+ }
+ });
+ return endpoint;
+ }
+
public void addCandidate(IceCandidate candidate, String name) {
if (this.uid.compareTo(name) == 0) {
outgoingMedia.addIceCandidate(candidate);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index 761186f..ab034ed 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -198,8 +198,7 @@ public class KTestStream implements IKStream {
}
private void sendPlayEnd(IWsClient _c) {
- WebSocketHelper.sendClient(_c, newTestKurentoMsg()
- .put("id", "playStopped"));
+ WebSocketHelper.sendClient(_c, newTestKurentoMsg().put("id", "playStopped"));
release();
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 627977a..050337a 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -19,13 +19,14 @@
*/
package org.apache.openmeetings.core.remote;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.entity.basic.Client;
@@ -36,11 +37,15 @@ import org.apache.openmeetings.db.entity.room.Room.Right;
import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
+import org.kurento.client.Endpoint;
import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.KurentoClient;
+import org.kurento.client.MediaObject;
import org.kurento.client.MediaPipeline;
import org.kurento.client.ObjectCreatedEvent;
+import org.kurento.client.PlayerEndpoint;
+import org.kurento.client.RecorderEndpoint;
import org.kurento.client.Tag;
import org.kurento.client.WebRtcEndpoint;
import org.slf4j.Logger;
@@ -56,6 +61,7 @@ public class KurentoHandler {
private final static String TAG_MODE = "mode";
private final static String TAG_ROOM = "roomId";
public final static String KURENTO_TYPE = "kurento";
+ private long checkTimeout = 200; //ms
private String kurentoWsUrl;
private KurentoClient client;
private String kuid;
@@ -83,38 +89,61 @@ public class KurentoHandler {
// room created
final String roid = evt.getObject().getId();
scheduler.schedule(() -> {
- if (client != null) { // still alive
- MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
- try {
- Map<String, String> tags = tagsAsMap(pipe);
- if (kuid.equals(tags.get(TAG_KUID))) {
- if (MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_MODE))) {
- return;
- }
- KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
- if (r.getPipelineId().equals(pipe.getId())) {
- return;
- } else if (r != null) {
- rooms.remove(r.getRoomId());
- r.close();
- }
+ if (client == null) {
+ return;
+ }
+ // still alive
+ MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
+ try {
+ Map<String, String> tags = tagsAsMap(pipe);
+ if (validTestPipeline(tags)) {
+ return;
+ }
+ if (kuid.equals(tags.get(TAG_KUID))) {
+ KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
+ if (r.getPipelineId().equals(pipe.getId())) {
+ return;
+ } else if (r != null) {
+ rooms.remove(r.getRoomId());
+ r.close();
}
- } catch(Exception e) {
- //no-op, connect will be dropped
}
- log.warn("Invalid MediaPipeline {} detected, will be dropped", pipe.getId());
- pipe.release();
+ } catch(Exception e) {
+ //no-op, connect will be dropped
}
- }, 2, TimeUnit.SECONDS);
- } else if (evt.getObject() instanceof WebRtcEndpoint) { //FIXME TODO RecordingEndpoint
+ log.warn("Invalid MediaPipeline {} detected, will be dropped", pipe.getId());
+ pipe.release();
+ }, checkTimeout, MILLISECONDS);
+ } else if (evt.getObject() instanceof Endpoint) {
// endpoint created
- final String eoid = evt.getObject().getId();
+ Endpoint _point = (Endpoint)evt.getObject();
+ final String eoid = _point.getId();
+ Class<? extends Endpoint> _clazz = null;
+ if (_point instanceof WebRtcEndpoint) {
+ _clazz = WebRtcEndpoint.class;
+ } else if (_point instanceof RecorderEndpoint) {
+ _clazz = RecorderEndpoint.class;
+ } else if (_point instanceof PlayerEndpoint) {
+ _clazz = PlayerEndpoint.class;
+ }
+ final Class<? extends Endpoint> clazz = _clazz;
scheduler.schedule(() -> {
- if (client != null) { // still alive
- WebRtcEndpoint point = client.getById(eoid, WebRtcEndpoint.class);
- //point.release();
+ if (client == null || clazz == null) {
+ return;
}
- }, 2, TimeUnit.SECONDS);
+ // still alive
+ Endpoint point = client.getById(eoid, clazz);
+ if (validTestPipeline(point.getMediaPipeline())) {
+ return;
+ }
+ Map<String, String> tags = tagsAsMap(point);
+ KStream stream = getByUid(tags.get("suid"));
+ if (stream != null && stream.contains(tags.get("uid"))) {
+ return;
+ }
+ log.warn("Invalid Endpoint {} detected, will be dropped", point.getId());
+ point.release();
+ }, checkTimeout, MILLISECONDS);
}
}
});
@@ -131,7 +160,7 @@ public class KurentoHandler {
}
}
- private static Map<String, String> tagsAsMap(MediaPipeline pipe) {
+ private static Map<String, String> tagsAsMap(MediaObject pipe) {
Map<String, String> map = new HashMap<>();
for (Tag t : pipe.getTags()) {
map.put(t.getKey(), t.getValue());
@@ -139,6 +168,14 @@ public class KurentoHandler {
return map;
}
+ private boolean validTestPipeline(MediaPipeline pipeline) {
+ return validTestPipeline(tagsAsMap(pipeline));
+ }
+
+ private boolean validTestPipeline(Map<String, String> tags) {
+ return kuid.equals(tags.get(TAG_KUID)) && MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_MODE));
+ }
+
private MediaPipeline createTestPipeline() {
MediaPipeline pipe = client.createMediaPipeline();
pipe.addTag(TAG_KUID, kuid);
@@ -190,14 +227,14 @@ public class KurentoHandler {
case "toggleActivity":
toggleActivity(c, Client.Activity.valueOf(msg.getString("activity")));
break;
- case "receiveVideoFrom":
- final String senderUid = msg.getString("sender");
- final KStream sender = getByUid(senderUid);
+ case "broadcastStarted":
+ {
final String sdpOffer = msg.getString("sdpOffer");
if (user == null) {
return;
}
- user.receiveVideoFrom(this, c, sender, sdpOffer);
+ user.videoResponse(this, c, sdpOffer);
+ }
break;
case "onIceCandidate":
{
@@ -210,6 +247,17 @@ public class KurentoHandler {
user.addCandidate(cand, msg.getString("uid"));
}
break;
+ case "receiveVideo":
+ {
+ final String senderUid = msg.getString("sender");
+ KStream sender = getByUid(senderUid);
+ if (sender == null) {
+ return;
+ }
+ final String sdpOffer = msg.getString("sdpOffer");
+ sender.videoResponse(this, c, sdpOffer);
+ }
+ break;
}
}
}
@@ -245,7 +293,7 @@ public class KurentoHandler {
//close
leaveRoom(cm.update(c.removeStream(c.getUid())));
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.closeStream, c.getUid()));
+ //FIXME TODO update interview buttons
} else if (!broadcasting) {
//join
KRoom room = getRoom(c.getRoomId());
@@ -253,7 +301,8 @@ public class KurentoHandler {
sd.setWidth(c.getWidth());
sd.setHeight(c.getHeight());
cm.update(c.addStream(sd));
- room.join(this, c, sd);
+ room.startBroadcast(this, c, sd);
+ //FIXME TODO update interview buttons
} else {
//change constraints
//FIXME TODO WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
@@ -263,9 +312,11 @@ public class KurentoHandler {
public void leaveRoom(Client c) {
remove(c);
- WebSocketHelper.sendClient(c, newKurentoMsg()
+ WebSocketHelper.sendAll(newKurentoMsg()
.put("id", "broadcastStopped")
- .put("uid", c.getUid()));
+ .put("uid", c.getUid())
+ .toString()
+ );
}
public void sendClient(String sid, JSONObject msg) {
@@ -364,4 +415,8 @@ public class KurentoHandler {
}
return r;
}
+
+ public void setCheckTimeout(long checkTimeout) {
+ this.checkTimeout = checkTimeout;
+ }
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
index a0606b8..9d9fe55 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
@@ -36,6 +36,7 @@ import org.apache.openmeetings.core.util.ws.WsMessageAll;
import org.apache.openmeetings.core.util.ws.WsMessageChat;
import org.apache.openmeetings.core.util.ws.WsMessageRoom;
import org.apache.openmeetings.core.util.ws.WsMessageRoomMsg;
+import org.apache.openmeetings.core.util.ws.WsMessageRoomOthers;
import org.apache.openmeetings.core.util.ws.WsMessageUser;
import org.apache.openmeetings.db.entity.basic.ChatMessage;
import org.apache.openmeetings.db.entity.basic.Client;
@@ -155,6 +156,9 @@ public class WebSocketHelper {
public static void send(IClusterWsMessage _m) {
if (_m instanceof WsMessageRoomMsg) {
sendRoom(((WsMessageRoomMsg)_m).getMsg(), false);
+ } else if (_m instanceof WsMessageRoomOthers) {
+ WsMessageRoomOthers m = (WsMessageRoomOthers)_m;
+ sendRoomOthers(m.getRoomId(), m.getUid(), m.getMsg(), false);
} else if (_m instanceof WsMessageRoom) {
WsMessageRoom m = (WsMessageRoom)_m;
sendRoom(m.getRoomId(), m.getMsg(), false);
@@ -192,6 +196,17 @@ public class WebSocketHelper {
sendRoom(roomId, m, null, null);
}
+ public static void sendRoomOthers(final Long roomId, final String uid, final JSONObject m) {
+ sendRoomOthers(roomId, uid, m, true);
+ }
+
+ private static void sendRoomOthers(final Long roomId, final String uid, final JSONObject m, boolean publish) {
+ if (publish) {
+ publish(new WsMessageRoomOthers(roomId, uid, m));
+ }
+ sendRoom(roomId, m, c -> !uid.equals(c.getUid()), null);
+ }
+
public static void sendRoom(ChatMessage m, JSONObject msg) {
sendRoom(m, msg, true);
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomOthers.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomOthers.java
new file mode 100644
index 0000000..df93d2a
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomOthers.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import com.github.openjson.JSONObject;
+
+public class WsMessageRoomOthers extends WsMessageRoom {
+ private static final long serialVersionUID = 1L;
+ private final String uid;
+
+ public WsMessageRoomOthers(Long roomId, String uid, JSONObject msg) {
+ super(roomId, msg);
+ this.uid = uid;
+ }
+
+ public String getUid() {
+ return uid;
+ }
+}
diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
index dbccc8f..b3b05cc 100644
--- a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
+++ b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
@@ -52,8 +52,6 @@ public class RoomMessage implements IWebSocketPushMessage {
, requestRightExclusive
, haveQuestion
, kick
- , newStream
- , closeStream
, mute
, exclusive
, quickPollUpdated
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 3de4273..fc9f610 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -167,7 +167,7 @@ public class RoomPanel extends BasePanel {
for (Client c: cm.listByRoom(getRoom().getId())) {
for (StreamDesc sd : c.getStreams()) {
JSONObject jo = videoJson(c, c.getSid(), sd);
- sb.append(String.format("VideoManager.play(%s);", jo));
+ //FIXME TODO sb.append(String.format("VideoManager.play(%s);", jo));
hasStreams = true;
}
}
@@ -492,47 +492,6 @@ public class RoomPanel extends BasePanel {
updateInterviewRecordingButtons(handler);
}
break;
- case newStream:
- {
- /* FIXME TODO
- JSONObject obj = new JSONObject(((TextRoomMessage)m).getText());
- String uid = obj.getString("uid");
- Client c = cm.get(uid);
- if (c == null) {
- // screen client, ext video stream
- c = cm.getBySid(obj.getString("sid"));
- }
- if (c == null) {
- log.error("Not existing user in newStream {} !!!!", uid);
- return;
- }
- boolean self = _c.getSid().equals(c.getSid());
- if (!self || Client.Type.room != scm.get(uid).getType()) { // stream from others or self external video
- JSONObject jo = videoJson(c, _c.getSid(), uid);
- handler.appendJavaScript(String.format("VideoManager.play(%s);", jo));
- }
- if (self) {
- cm.update(c.addStream(uid));
- }*/
- updateInterviewRecordingButtons(handler);
- }
- break;
- case closeStream:
- {
- /* FIXME TODO
- JSONObject obj = new JSONObject(((TextRoomMessage)m).getText());
- String uid = obj.getString("uid");
- Client c = cm.getBySid(obj.getString("sid"));
- if (c != null) {
- //c == null means client exits the room
- if (_c.getUid().equals(c.getUid())) {
- cm.update(c.removeStream(uid));
- }
- }
- handler.appendJavaScript(String.format("VideoManager.close('%s');", uid));*/
- updateInterviewRecordingButtons(handler);
- }
- break;
case roomEnter:
sidebar.update(handler);
menu.update(handler);
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
index 9439cbd..e549956 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
@@ -44,26 +44,49 @@ var VideoManager = (function() {
if (error) {
return OmUtil.error(error);
}
- this.generateOffer(v.offerToReceiveVideo);
+ this.generateOffer(function(error, offerSdp, wp) {
+ if (error) {
+ return OmUtil.error('Sender sdp offer error');
+ }
+ OmUtil.log('Invoking Sender SDP offer callback function');
+ VideoManager.sendMessage({
+ id : 'broadcastStarted'
+ , sdpOffer: offerSdp
+ });
+ });
}));
}
- function _receiveVideo(uid) {
- const w = $('#' + VideoUtil.getVid(uid))
+ function _onReceive(msg) {
+ const uid = msg.client.uid;
+ $('#' + VideoUtil.getVid(uid)).remove();
+ const o = VideoSettings.load() //FIXME TODO add multiple streams support
+ //, w = Video().init(msg.client, VideoUtil.getPos(VideoUtil.getRects(VID_SEL), msg.stream.width, msg.stream.height + 25))
+ , w = Video().init(msg.client, VideoUtil.getPos(VideoUtil.getRects(VID_SEL), msg.client.width, msg.client.height + 25))
, v = w.data()
, cl = v.client();
- v.setPeer(new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(
- {
- remoteVideo: v.video()
- , onicecandidate: v.onIceCandidate
+ OmUtil.log(uid + " receiving video");
+
+ v.setPeer(new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly({
+ remoteVideo : v.video()
+ , onicecandidate : v.onIceCandidate
}
- , function (error) {
- if(error) {
+ , function(error) {
+ if (error) {
return OmUtil.error(error);
}
- this.generateOffer(v.offerToReceiveVideo);
- }
- ));
+ this.generateOffer(function onOfferViewer(error, offerSdp) {
+ if (error) {
+ return OmUtil.error('Receiver sdp offer error');
+ }
+ OmUtil.log('Invoking Receiver SDP offer callback function');
+ VideoManager.sendMessage({
+ id : 'receiveVideo'
+ , sender: cl.uid
+ , sdpOffer: offerSdp
+ });
+ });
+ }));
}
function _onWsMessage(jqEvent, msg) {
@@ -100,6 +123,9 @@ var VideoManager = (function() {
});
}
break;
+ case 'newStream':
+ _onReceive(m);
+ break;
default:
//no-op
}
@@ -194,9 +220,6 @@ var VideoManager = (function() {
v.dialog('open');
}
});
- } else if ('sharing' !== c.type) {
- Video().init(c, VideoUtil.getPos(VideoUtil.getRects(VID_SEL), c.width, c.height + 25));
- _receiveVideo(c.uid);
}
}
function _close(uid, showShareBtn) {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
index 6f590d4..f30275d 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
@@ -333,17 +333,6 @@ var Video = (function() {
, uid: c.uid
});
};
- self.offerToReceiveVideo = function(error, offerSdp, wp) {
- if (error) {
- return OmUtil.error("sdp offer error");
- }
- OmUtil.log('Invoking SDP offer callback function');
- VideoManager.sendMessage({
- id : "receiveVideoFrom"
- , sender: c.uid
- , sdpOffer: offerSdp
- });
- }
self.resizePod = _resizePod;
return self;
});
diff --git a/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml b/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml
index 2248d18..12cbf71 100644
--- a/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml
+++ b/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml
@@ -139,7 +139,6 @@
</bean>
<!-- Kurento -->
- <bean id="kurentoHandler" class="org.apache.openmeetings.core.remote.KurentoHandler" init-method="init" destroy-method="destroy">
- <property name="kurentoWsUrl" value="wss://192.168.15.177:8883/kurento"/>
- </bean>
+ <bean id="kurentoHandler" class="org.apache.openmeetings.core.remote.KurentoHandler" init-method="init" destroy-method="destroy"
+ p:kurentoWsUrl="ws://localhost:8888/kurento" p:checkTimeout="200"/>
</beans>