You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2018/09/26 16:58:26 UTC
[openmeetings] branch master updated: [OPENMEETINGS-1649] group
video seems to work
This is an automated email from the ASF dual-hosted git repository.
solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push:
new 8e13eb1 [OPENMEETINGS-1649] group video seems to work
8e13eb1 is described below
commit 8e13eb18f5c8e9241a9d96f082e47de3a31c31da
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Wed Sep 26 23:58:14 2018 +0700
[OPENMEETINGS-1649] group video seems to work
---
.../org/apache/openmeetings/core/remote/KRoom.java | 25 +++---
.../apache/openmeetings/core/remote/KStream.java | 89 ++++++++++++----------
.../openmeetings/core/remote/KurentoHandler.java | 56 +++++++-------
3 files changed, 93 insertions(+), 77 deletions(-)
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 3bc9904..91a66ec 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -26,8 +26,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.PreDestroy;
-
import org.apache.openmeetings.db.entity.basic.Client;
import org.kurento.client.Continuation;
import org.kurento.client.MediaPipeline;
@@ -55,19 +53,28 @@ public class KRoom implements Closeable {
log.info("ROOM {} has been created", roomId);
}
- public KStream startBroadcast(final KurentoHandler h, Client c) {
- log.info("ROOM {}: adding participant {}", roomId, c.getUid());
- final KStream u = new KStream(h, c, this.pipeline);
- participants.put(u.getUid(), u);
- h.usersByUid.put(u.getUid(), u);
- return u;
+ public KStream join(final KurentoHandler h, final Client c) {
+ log.info("ROOM {}: join participant {}", roomId, c.getUid());
+ final KStream stream = new KStream(h, c, this.pipeline);
+ participants.put(stream.getUid(), stream);
+ h.usersByUid.put(stream.getUid(), stream);
+ return stream;
}
public Collection<KStream> getParticipants() {
return participants.values();
}
- @PreDestroy
+ public void leave(final Client c) {
+ for (Map.Entry<String, KStream> e : participants.entrySet()) {
+ e.getValue().remove(c);
+ }
+ KStream stream = participants.remove(c.getUid());
+ if (stream != null) {
+ stream.release();
+ }
+ }
+
@Override
public void close() {
for (final KStream user : participants.values()) {
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 85b9010..acae079 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -63,7 +63,7 @@ public class KStream implements IKStream {
this.roomId = c.getRoomId();
//TODO Min/MaxVideoSendBandwidth
//TODO Min/Max Audio/Video RecvBandwidth
- outgoingMedia = createEndpoint(h, c);
+ outgoingMedia = createEndpoint(h, this);
//TODO add logic here
outgoingMedia.addConnectionStateChangedListener(new EventListener<ConnectionStateChangedEvent>() {
@Override
@@ -76,15 +76,20 @@ public class KStream implements IKStream {
public void onEvent(MediaFlowOutStateChangeEvent event) {
log.warn("MediaFlowOutStateChange {}", event.getState());
if (MediaFlowState.FLOWING == event.getState()) {
- JSONObject msg = newKurentoMsg();
- msg.put("id", "newStream");
- msg.put("client", c.toJson(false));
- WebSocketHelper.sendRoomOthers(roomId, uid, msg);
+ WebSocketHelper.sendRoomOthers(roomId, uid, newKurentoMsg()
+ .put("id", "newStream")
+ .put("client", c.toJson(false)));
}
}
});
}
+ public KStream startBroadcast(final KurentoHandler h, final Client c, final String sdpOffer) {
+ videoResponse(h, this, sdpOffer);
+ WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+ return this;
+ }
+
public String getSid() {
return sid;
}
@@ -106,47 +111,68 @@ public class KStream implements IKStream {
return this.uid.equals(uid) || incomingMedia.containsKey(uid);
}
- public void videoResponse(final KurentoHandler h, Client c, String sdpOffer) {
- final boolean self = c.getUid().equals(uid);
+ public void videoResponse(final KurentoHandler h, KStream sender, String sdpOffer) {
+ final boolean self = sender.getUid().equals(uid);
log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", roomId);
- log.trace("USER {}: SdpOffer is {}", c.getUid(), sdpOffer);
+ log.trace("USER {}: SdpOffer is {}", sender.getUid(), sdpOffer);
- final WebRtcEndpoint endpoint = getEndpointForUser(h, c);
+ final WebRtcEndpoint endpoint = getEndpointForUser(h, sender);
final String sdpAnswer = endpoint.processOffer(sdpOffer);
log.trace("USER {}: SdpAnswer is {}", uid, sdpAnswer);
- h.sendClient(c.getSid(), newKurentoMsg()
+ h.sendClient(sid, newKurentoMsg()
.put("id", "videoResponse")
- .put("uid", uid)
+ .put("uid", sender.getUid())
.put("sdpAnswer", sdpAnswer));
log.debug("gather candidates");
endpoint.gatherCandidates();
- if (self) {
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
- }
}
- private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final Client c) {
- if (c.getUid().equals(uid)) {
+ private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final KStream sender) {
+ if (sender.getUid().equals(uid)) {
log.debug("PARTICIPANT {}: configuring loopback", uid);
return outgoingMedia;
}
- log.debug("PARTICIPANT {}: receiving video from {}", uid, c.getUid());
+ log.debug("PARTICIPANT {}: receiving video from {}", uid, sender.getUid());
- WebRtcEndpoint incoming = incomingMedia.get(c.getUid());
+ WebRtcEndpoint incoming = incomingMedia.get(sender.getUid());
if (incoming == null) {
- log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, c.getUid());
- incoming = createEndpoint(h, c);
- incomingMedia.put(c.getUid(), incoming);
+ log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, sender.getUid());
+ incoming = createEndpoint(h, sender);
+ incomingMedia.put(sender.getUid(), incoming);
}
- log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, c.getUid());
- outgoingMedia.connect(incoming);
+ log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, sender.getUid());
+ sender.outgoingMedia.connect(incoming);
return incoming;
}
+ private WebRtcEndpoint createEndpoint(final KurentoHandler h, final KStream sender) {
+ WebRtcEndpoint endpoint = new WebRtcEndpoint.Builder(pipeline).build();
+ endpoint.addTag("suid", uid);
+ endpoint.addTag("uid", sender.getUid());
+
+ endpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
+ @Override
+ public void onEvent(IceCandidateFoundEvent event) {
+ h.sendClient(sid, newKurentoMsg()
+ .put("id", "iceCandidate")
+ .put("uid", sender.getUid())
+ .put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate()))));
+ }
+ });
+ return endpoint;
+ }
+
+ public void remove(final Client c) {
+ WebRtcEndpoint point = incomingMedia.remove(c.getUid());
+ if (point != null) {
+ point.release();
+ }
+ }
+
@Override
public void release() {
log.debug("PARTICIPANT {}: Releasing resources", uid);
@@ -179,23 +205,6 @@ public class KStream implements IKStream {
});
}
- private WebRtcEndpoint createEndpoint(final KurentoHandler h, final Client c) {
- WebRtcEndpoint endpoint = new WebRtcEndpoint.Builder(pipeline).build();
- endpoint.addTag("suid", uid);
- endpoint.addTag("uid", c.getUid());
-
- endpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
- @Override
- public void onEvent(IceCandidateFoundEvent event) {
- h.sendClient(c.getSid(), newKurentoMsg()
- .put("id", "iceCandidate")
- .put("uid", uid)
- .put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate()))));
- }
- });
- return endpoint;
- }
-
public void addCandidate(IceCandidate candidate, String name) {
if (this.uid.compareTo(name) == 0) {
outgoingMedia.addIceCandidate(candidate);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index a0a9987..bccb63d 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -156,7 +157,15 @@ public class KurentoHandler {
public void destroy() {
if (client != null) {
- //FIXME TODO destroy connected objects
+ for (Entry<Long, KRoom> e : rooms.entrySet()) {
+ e.getValue().close();
+ }
+ rooms.clear();
+ for (Entry<String, KTestStream> e : testsByUid.entrySet()) {
+ e.getValue().release();
+ }
+ testsByUid.clear();
+ usersByUid.clear();
client.destroy();
}
}
@@ -204,7 +213,6 @@ public class KurentoHandler {
case "iceCandidate":
{
JSONObject candidate = msg.getJSONObject("candidate");
-
if (user != null) {
IceCandidate cand = new IceCandidate(candidate.getString("candidate"),
candidate.getString("sdpMid"), candidate.getInt("sdpMLineIndex"));
@@ -219,33 +227,27 @@ public class KurentoHandler {
} else {
final Client c = (Client)_c;
- if (c == null) {
+ if (c == null || c.getRoomId() == null) {
log.warn("Incoming message from invalid user");
+ return;
}
log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
- KStream stream = getByUid(_c.getUid());
+ KStream stream = getByUid(c.getUid());
+ if (stream == null) {
+ KRoom room = getRoom(c.getRoomId());
+ stream = room.join(this, c);
+ }
//FIXME TODO check client rights here
switch (cmdId) {
case "toggleActivity":
toggleActivity(c, Client.Activity.valueOf(msg.getString("activity")));
break;
case "broadcastStarted":
- {
- if (stream != null) {
- log.warn("Broadcast started, but strem already exists!");
- return;
- }
- KRoom room = getRoom(c.getRoomId());
- stream = room.startBroadcast(this, c);
- stream.videoResponse(this, c, msg.getString("sdpOffer"));
- }
+ stream.startBroadcast(this, c, msg.getString("sdpOffer"));
break;
case "onIceCandidate":
{
JSONObject candidate = msg.getJSONObject("candidate");
- if (stream == null) {
- return;
- }
IceCandidate cand = new IceCandidate(
candidate.getString("candidate")
, candidate.getString("sdpMid")
@@ -254,14 +256,7 @@ public class KurentoHandler {
}
break;
case "receiveVideo":
- {
- final String senderUid = msg.getString("sender");
- KStream sender = getByUid(senderUid);
- if (sender == null) {
- return;
- }
- sender.videoResponse(this, c, msg.getString("sdpOffer"));
- }
+ stream.videoResponse(this, getByUid(msg.getString("sender")), msg.getString("sdpOffer"));
break;
}
}
@@ -338,18 +333,23 @@ public class KurentoHandler {
.put("message", msg));
}
- public void remove(IWsClient c) {
- if (c == null) {
+ public void remove(IWsClient _c) {
+ if (_c == null) {
return;
}
- final String uid = c.getUid();
- final boolean test = !(c instanceof Client);
+ final String uid = _c.getUid();
+ final boolean test = !(_c instanceof Client);
IKStream u = test ? getTestByUid(uid) : getByUid(uid);
if (u != null) {
u.release();
if (test) {
testsByUid.remove(uid);
} else {
+ Client c = (Client)_c;
+ KRoom room = rooms.get(c.getRoomId());
+ if (room != null) {
+ room.leave(c);
+ }
usersByUid.remove(uid);
}
}