You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2018/09/26 16:58:26 UTC

[openmeetings] branch master updated: [OPENMEETINGS-1649] group video seems to work

This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e13eb1  [OPENMEETINGS-1649] group video seems to work
8e13eb1 is described below

commit 8e13eb18f5c8e9241a9d96f082e47de3a31c31da
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Wed Sep 26 23:58:14 2018 +0700

    [OPENMEETINGS-1649] group video seems to work
---
 .../org/apache/openmeetings/core/remote/KRoom.java | 25 +++---
 .../apache/openmeetings/core/remote/KStream.java   | 89 ++++++++++++----------
 .../openmeetings/core/remote/KurentoHandler.java   | 56 +++++++-------
 3 files changed, 93 insertions(+), 77 deletions(-)

diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 3bc9904..91a66ec 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -26,8 +26,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.annotation.PreDestroy;
-
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.kurento.client.Continuation;
 import org.kurento.client.MediaPipeline;
@@ -55,19 +53,28 @@ public class KRoom implements Closeable {
 		log.info("ROOM {} has been created", roomId);
 	}
 
-	public KStream startBroadcast(final KurentoHandler h, Client c) {
-		log.info("ROOM {}: adding participant {}", roomId, c.getUid());
-		final KStream u = new KStream(h, c, this.pipeline);
-		participants.put(u.getUid(), u);
-		h.usersByUid.put(u.getUid(), u);
-		return u;
+	public KStream join(final KurentoHandler h, final Client c) {
+		log.info("ROOM {}: join participant {}", roomId, c.getUid());
+		final KStream stream = new KStream(h, c, this.pipeline);
+		participants.put(stream.getUid(), stream);
+		h.usersByUid.put(stream.getUid(), stream);
+		return stream;
 	}
 
 	public Collection<KStream> getParticipants() {
 		return participants.values();
 	}
 
-	@PreDestroy
+	public void leave(final Client c) {
+		for (Map.Entry<String, KStream> e : participants.entrySet()) {
+			e.getValue().remove(c);
+		}
+		KStream stream = participants.remove(c.getUid());
+		if (stream != null) {
+			stream.release();
+		}
+	}
+
 	@Override
 	public void close() {
 		for (final KStream user : participants.values()) {
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 85b9010..acae079 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -63,7 +63,7 @@ public class KStream implements IKStream {
 		this.roomId = c.getRoomId();
 		//TODO Min/MaxVideoSendBandwidth
 		//TODO Min/Max Audio/Video RecvBandwidth
-		outgoingMedia = createEndpoint(h, c);
+		outgoingMedia = createEndpoint(h, this);
 		//TODO add logic here
 		outgoingMedia.addConnectionStateChangedListener(new EventListener<ConnectionStateChangedEvent>() {
 			@Override
@@ -76,15 +76,20 @@ public class KStream implements IKStream {
 			public void onEvent(MediaFlowOutStateChangeEvent event) {
 				log.warn("MediaFlowOutStateChange {}", event.getState());
 				if (MediaFlowState.FLOWING == event.getState()) {
-					JSONObject msg = newKurentoMsg();
-					msg.put("id", "newStream");
-					msg.put("client", c.toJson(false));
-					WebSocketHelper.sendRoomOthers(roomId, uid, msg);
+					WebSocketHelper.sendRoomOthers(roomId, uid, newKurentoMsg()
+							.put("id", "newStream")
+							.put("client", c.toJson(false)));
 				}
 			}
 		});
 	}
 
+	public KStream startBroadcast(final KurentoHandler h, final Client c, final String sdpOffer) {
+		videoResponse(h, this, sdpOffer);
+		WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+		return this;
+	}
+
 	public String getSid() {
 		return sid;
 	}
@@ -106,47 +111,68 @@ public class KStream implements IKStream {
 		return this.uid.equals(uid) || incomingMedia.containsKey(uid);
 	}
 
-	public void videoResponse(final KurentoHandler h, Client c, String sdpOffer) {
-		final boolean self = c.getUid().equals(uid);
+	public void videoResponse(final KurentoHandler h, KStream sender, String sdpOffer) {
+		final boolean self = sender.getUid().equals(uid);
 		log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", roomId);
-		log.trace("USER {}: SdpOffer is {}", c.getUid(), sdpOffer);
+		log.trace("USER {}: SdpOffer is {}", sender.getUid(), sdpOffer);
 
-		final WebRtcEndpoint endpoint = getEndpointForUser(h, c);
+		final WebRtcEndpoint endpoint = getEndpointForUser(h, sender);
 		final String sdpAnswer = endpoint.processOffer(sdpOffer);
 
 		log.trace("USER {}: SdpAnswer is {}", uid, sdpAnswer);
-		h.sendClient(c.getSid(), newKurentoMsg()
+		h.sendClient(sid, newKurentoMsg()
 				.put("id", "videoResponse")
-				.put("uid", uid)
+				.put("uid", sender.getUid())
 				.put("sdpAnswer", sdpAnswer));
 		log.debug("gather candidates");
 		endpoint.gatherCandidates();
-		if (self) {
-			WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
-		}
 	}
 
-	private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final Client c) {
-		if (c.getUid().equals(uid)) {
+	private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final KStream sender) {
+		if (sender.getUid().equals(uid)) {
 			log.debug("PARTICIPANT {}: configuring loopback", uid);
 			return outgoingMedia;
 		}
 
-		log.debug("PARTICIPANT {}: receiving video from {}", uid, c.getUid());
+		log.debug("PARTICIPANT {}: receiving video from {}", uid, sender.getUid());
 
-		WebRtcEndpoint incoming = incomingMedia.get(c.getUid());
+		WebRtcEndpoint incoming = incomingMedia.get(sender.getUid());
 		if (incoming == null) {
-			log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, c.getUid());
-			incoming = createEndpoint(h, c);
-			incomingMedia.put(c.getUid(), incoming);
+			log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, sender.getUid());
+			incoming = createEndpoint(h, sender);
+			incomingMedia.put(sender.getUid(), incoming);
 		}
 
-		log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, c.getUid());
-		outgoingMedia.connect(incoming);
+		log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, sender.getUid());
+		sender.outgoingMedia.connect(incoming);
 
 		return incoming;
 	}
 
+	private WebRtcEndpoint createEndpoint(final KurentoHandler h, final KStream sender) {
+		WebRtcEndpoint endpoint = new WebRtcEndpoint.Builder(pipeline).build();
+		endpoint.addTag("suid", uid);
+		endpoint.addTag("uid", sender.getUid());
+
+		endpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
+			@Override
+			public void onEvent(IceCandidateFoundEvent event) {
+				h.sendClient(sid, newKurentoMsg()
+						.put("id", "iceCandidate")
+						.put("uid", sender.getUid())
+						.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate()))));
+					}
+		});
+		return endpoint;
+	}
+
+	public void remove(final Client c) {
+		WebRtcEndpoint point = incomingMedia.remove(c.getUid());
+		if (point != null) {
+			point.release();
+		}
+	}
+
 	@Override
 	public void release() {
 		log.debug("PARTICIPANT {}: Releasing resources", uid);
@@ -179,23 +205,6 @@ public class KStream implements IKStream {
 		});
 	}
 
-	private WebRtcEndpoint createEndpoint(final KurentoHandler h, final Client c) {
-		WebRtcEndpoint endpoint = new WebRtcEndpoint.Builder(pipeline).build();
-		endpoint.addTag("suid", uid);
-		endpoint.addTag("uid", c.getUid());
-
-		endpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
-			@Override
-			public void onEvent(IceCandidateFoundEvent event) {
-				h.sendClient(c.getSid(), newKurentoMsg()
-						.put("id", "iceCandidate")
-						.put("uid", uid)
-						.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate()))));
-					}
-		});
-		return endpoint;
-	}
-
 	public void addCandidate(IceCandidate candidate, String name) {
 		if (this.uid.compareTo(name) == 0) {
 			outgoingMedia.addIceCandidate(candidate);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index a0a9987..bccb63d 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -156,7 +157,15 @@ public class KurentoHandler {
 
 	public void destroy() {
 		if (client != null) {
-			//FIXME TODO destroy connected objects
+			for (Entry<Long, KRoom> e : rooms.entrySet()) {
+				e.getValue().close();
+			}
+			rooms.clear();
+			for (Entry<String, KTestStream> e : testsByUid.entrySet()) {
+				e.getValue().release();
+			}
+			testsByUid.clear();
+			usersByUid.clear();
 			client.destroy();
 		}
 	}
@@ -204,7 +213,6 @@ public class KurentoHandler {
 				case "iceCandidate":
 				{
 					JSONObject candidate = msg.getJSONObject("candidate");
-
 					if (user != null) {
 						IceCandidate cand = new IceCandidate(candidate.getString("candidate"),
 								candidate.getString("sdpMid"), candidate.getInt("sdpMLineIndex"));
@@ -219,33 +227,27 @@ public class KurentoHandler {
 		} else {
 			final Client c = (Client)_c;
 
-			if (c == null) {
+			if (c == null || c.getRoomId() == null) {
 				log.warn("Incoming message from invalid user");
+				return;
 			}
 			log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
-			KStream stream = getByUid(_c.getUid());
+			KStream stream = getByUid(c.getUid());
+			if (stream == null) {
+				KRoom room = getRoom(c.getRoomId());
+				stream = room.join(this, c);
+			}
 			//FIXME TODO check client rights here
 			switch (cmdId) {
 				case "toggleActivity":
 					toggleActivity(c, Client.Activity.valueOf(msg.getString("activity")));
 					break;
 				case "broadcastStarted":
-				{
-					if (stream != null) {
-						log.warn("Broadcast started, but strem already exists!");
-						return;
-					}
-					KRoom room = getRoom(c.getRoomId());
-					stream = room.startBroadcast(this, c);
-					stream.videoResponse(this, c, msg.getString("sdpOffer"));
-				}
+					stream.startBroadcast(this, c, msg.getString("sdpOffer"));
 					break;
 				case "onIceCandidate":
 				{
 					JSONObject candidate = msg.getJSONObject("candidate");
-					if (stream == null) {
-						return;
-					}
 					IceCandidate cand = new IceCandidate(
 							candidate.getString("candidate")
 							, candidate.getString("sdpMid")
@@ -254,14 +256,7 @@ public class KurentoHandler {
 				}
 					break;
 				case "receiveVideo":
-				{
-					final String senderUid = msg.getString("sender");
-					KStream sender = getByUid(senderUid);
-					if (sender == null) {
-						return;
-					}
-					sender.videoResponse(this, c, msg.getString("sdpOffer"));
-				}
+					stream.videoResponse(this, getByUid(msg.getString("sender")), msg.getString("sdpOffer"));
 					break;
 			}
 		}
@@ -338,18 +333,23 @@ public class KurentoHandler {
 				.put("message", msg));
 	}
 
-	public void remove(IWsClient c) {
-		if (c == null) {
+	public void remove(IWsClient _c) {
+		if (_c == null) {
 			return;
 		}
-		final String uid = c.getUid();
-		final boolean test = !(c instanceof Client);
+		final String uid = _c.getUid();
+		final boolean test = !(_c instanceof Client);
 		IKStream u = test ? getTestByUid(uid) : getByUid(uid);
 		if (u != null) {
 			u.release();
 			if (test) {
 				testsByUid.remove(uid);
 			} else {
+				Client c = (Client)_c;
+				KRoom room = rooms.get(c.getRoomId());
+				if (room != null) {
+					room.leave(c);
+				}
 				usersByUid.remove(uid);
 			}
 		}