You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2018/09/23 11:52:47 UTC

[openmeetings] branch master updated: [OPENMEETINGS-1649] room video somehow work (not stable)

This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 267da92  [OPENMEETINGS-1649] room video somehow work (not stable)
267da92 is described below

commit 267da9245a65326f519c889b974b4c6967bcdfde
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Sun Sep 23 18:52:32 2018 +0700

    [OPENMEETINGS-1649] room video somehow work (not stable)
---
 .../org/apache/openmeetings/core/remote/KRoom.java |  42 +-----
 .../apache/openmeetings/core/remote/KStream.java   | 158 +++++++++------------
 .../openmeetings/core/remote/KTestStream.java      |   3 +-
 .../openmeetings/core/remote/KurentoHandler.java   | 127 ++++++++++++-----
 .../openmeetings/core/util/WebSocketHelper.java    |  15 ++
 .../core/util/ws/WsMessageRoomOthers.java          |  35 +++++
 .../openmeetings/db/util/ws/RoomMessage.java       |   2 -
 .../apache/openmeetings/web/room/RoomPanel.java    |  43 +-----
 .../openmeetings/web/room/raw-video-manager.js     |  53 +++++--
 .../org/apache/openmeetings/web/room/raw-video.js  |  11 --
 .../webapp/WEB-INF/classes/applicationContext.xml  |   5 +-
 11 files changed, 252 insertions(+), 242 deletions(-)

diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index f06e141..d4b1d77 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -24,9 +24,7 @@ package org.apache.openmeetings.core.remote;
 import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -66,53 +64,22 @@ public class KRoom implements Closeable {
 		log.info("ROOM {} has been created", roomId);
 	}
 
-	public KStream addStream(final KurentoHandler h, StreamDesc sd) {
+	public KStream startBroadcast(final KurentoHandler h, Client c, StreamDesc sd) {
 		log.info("ROOM {}: adding participant {}", roomId, sd.getUid());
-		final KStream u = new KStream(h, sd.getSid(), sd.getUid(), this.roomId, this.pipeline);
+		final KStream u = new KStream(h, c, this.pipeline);
 		participants.put(u.getUid(), u);
 		h.usersByUid.put(u.getUid(), u);
-		return u;
-	}
-
-	public KStream join(final KurentoHandler h, Client c, StreamDesc sd) {
-		KStream u = addStream(h, sd);
 		broadcast(h, c, sd);
 		return u;
 	}
 
-	public void leave(final KurentoHandler h, KStream user) {
-		log.debug("PARTICIPANT {}: Leaving room {}", user.getUid(), this.roomId);
-		this.removeParticipant(h, user.getUid());
-		user.release();
-	}
-
-	private void removeParticipant(final KurentoHandler h, String name) {
-		participants.remove(name);
-
-		log.debug("ROOM {}: notifying all users that {} is leaving the room", this.roomId, name);
-
-		final List<String> unnotifiedParticipants = new ArrayList<>();
-		final JSONObject msg = newKurentoMsg();
-		msg.put("id", "participantLeft");
-		msg.put("name", name);
-		for (final KStream participant : participants.values()) {
-			participant.cancelVideoFrom(name);
-			h.sendClient(participant.getSid(), msg);
-		}
-
-		if (!unnotifiedParticipants.isEmpty()) {
-			log.debug("ROOM {}: The users {} could not be notified that {} left the room", this.roomId,
-					unnotifiedParticipants, name);
-		}
-	}
-
 	private static void broadcast(final KurentoHandler h, Client c, StreamDesc sd) {
 		final JSONObject msg = newKurentoMsg();
 		msg.put("id", "broadcast");
 		msg.put("uid", sd.getUid());
 		msg.put("stream", new JSONObject(sd));
 		msg.put("client", c.toJson(true));
-		log.debug("User {}: has started broadcast", sd.getSid());
+		log.debug("User {}: has started broadcast", sd.getUid());
 		h.sendClient(sd.getSid(), msg);
 	}
 
@@ -126,11 +93,8 @@ public class KRoom implements Closeable {
 		for (final KStream user : participants.values()) {
 			user.release();
 		}
-
 		participants.clear();
-
 		pipeline.release(new Continuation<Void>() {
-
 			@Override
 			public void onSuccess(Void result) throws Exception {
 				log.trace("ROOM {}: Released Pipeline", KRoom.this.roomId);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index cef92fb..e47c6d7 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -36,6 +36,7 @@ import org.kurento.client.EventListener;
 import org.kurento.client.IceCandidate;
 import org.kurento.client.IceCandidateFoundEvent;
 import org.kurento.client.MediaFlowOutStateChangeEvent;
+import org.kurento.client.MediaFlowState;
 import org.kurento.client.MediaPipeline;
 import org.kurento.client.WebRtcEndpoint;
 import org.kurento.jsonrpc.JsonUtils;
@@ -59,25 +60,15 @@ public class KStream implements IKStream {
 	private final WebRtcEndpoint outgoingMedia;
 	private final ConcurrentMap<String, WebRtcEndpoint> incomingMedia = new ConcurrentHashMap<>();
 
-	public KStream(final KurentoHandler h, final String sid, final String uid, Long roomId, MediaPipeline pipeline) {
+	//FIXME TODO multiple streams from client
+	public KStream(final KurentoHandler h, final Client c, MediaPipeline pipeline) {
 		this.pipeline = pipeline;
-		this.sid = sid;
-		this.uid = uid;
-		this.roomId = roomId;
+		this.sid = c.getSid();
+		this.uid = c.getUid();
+		this.roomId = c.getRoomId();
 		//TODO Min/MaxVideoSendBandwidth
 		//TODO Min/Max Audio/Video RecvBandwidth
-		outgoingMedia = new WebRtcEndpoint.Builder(pipeline).build();
-
-		outgoingMedia.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
-			@Override
-			public void onEvent(IceCandidateFoundEvent event) {
-				JSONObject response = newKurentoMsg();
-				response.put("id", "iceCandidate");
-				response.put("uid", uid);
-				response.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate())));
-				h.sendClient(sid, response);
-			}
-		});
+		outgoingMedia = createEndpoint(h, c);
 		//TODO add logic here
 		outgoingMedia.addConnectionStateChangedListener(new EventListener<ConnectionStateChangedEvent>() {
 			@Override
@@ -89,14 +80,16 @@ public class KStream implements IKStream {
 			@Override
 			public void onEvent(MediaFlowOutStateChangeEvent event) {
 				log.warn("MediaFlowOutStateChange {}", event.getState());
+				if (MediaFlowState.FLOWING == event.getState()) {
+					JSONObject msg = newKurentoMsg();
+					msg.put("id", "newStream");
+					msg.put("client", c.toJson(false));
+					WebSocketHelper.sendRoomOthers(roomId, uid, msg);
+				}
 			}
 		});
 	}
 
-	public WebRtcEndpoint getOutgoingWebRtcPeer() {
-		return outgoingMedia;
-	}
-
 	public String getSid() {
 		return sid;
 	}
@@ -111,111 +104,74 @@ public class KStream implements IKStream {
 	 * @return The room
 	 */
 	public Long getRoomId() {
-		return this.roomId;
+		return roomId;
 	}
 
-	public void receiveVideoFrom(final KurentoHandler h, Client c, KStream sender, String sdpOffer) {
-		log.info("USER {}: connecting with {} in room {}", this.uid, sender.getUid(), this.roomId);
+	public boolean contains(String uid) {
+		return this.uid.equals(uid) || incomingMedia.containsKey(uid);
+	}
 
-		log.trace("USER {}: SdpOffer for {} is {}", this.uid, sender.getUid(), sdpOffer);
+	public void videoResponse(final KurentoHandler h, Client c, String sdpOffer) {
+		final boolean self = c.getUid().equals(uid);
+		log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", roomId);
+		log.trace("USER {}: SdpOffer is {}", c.getUid(), sdpOffer);
 
-		if (c.getUid().equals(sender.getUid())) {
-			WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
-			WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.newStream, c.getUid()));
-		}
-		final String sdpAnswer = this.getEndpointForUser(h, sender).processOffer(sdpOffer);
-		final JSONObject scParams = newKurentoMsg();
-		scParams.put("id", "videoResponse");
-		scParams.put("uid", sender.getUid());
-		scParams.put("sdpAnswer", sdpAnswer);
+		final WebRtcEndpoint endpoint = getEndpointForUser(h, c);
+		final String sdpAnswer = endpoint.processOffer(sdpOffer);
 
-		log.trace("USER {}: SdpAnswer for {} is {}", this.uid, sender.getUid(), sdpAnswer);
-		h.sendClient(sid, scParams);
+		log.trace("USER {}: SdpAnswer is {}", uid, sdpAnswer);
+		h.sendClient(c.getSid(), newKurentoMsg()
+				.put("id", "videoResponse")
+				.put("uid", uid)
+				.put("sdpAnswer", sdpAnswer));
 		log.debug("gather candidates");
-		this.getEndpointForUser(h, sender).gatherCandidates();
+		endpoint.gatherCandidates();
+		if (self) {
+			WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+		}
 	}
 
-	private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final KStream sender) {
-		if (sender.getUid().equals(uid)) {
-			log.debug("PARTICIPANT {}: configuring loopback", this.uid);
+	private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, final Client c) {
+		if (c.getUid().equals(uid)) {
+			log.debug("PARTICIPANT {}: configuring loopback", uid);
 			return outgoingMedia;
 		}
 
-		log.debug("PARTICIPANT {}: receiving video from {}", this.uid, sender.getUid());
+		log.debug("PARTICIPANT {}: receiving video from {}", uid, c.getUid());
 
-		WebRtcEndpoint incoming = incomingMedia.get(sender.getUid());
+		WebRtcEndpoint incoming = incomingMedia.get(c.getUid());
 		if (incoming == null) {
-			log.debug("PARTICIPANT {}: creating new endpoint for {}", this.uid, sender.getUid());
-			incoming = new WebRtcEndpoint.Builder(pipeline).build();
-
-			incoming.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
-
-				@Override
-				public void onEvent(IceCandidateFoundEvent event) {
-					JSONObject response = newKurentoMsg();
-					response.put("id", "iceCandidate");
-					response.put("uid", sender.getUid());
-					response.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate())));
-					h.sendClient(sid, response);
-				}
-			});
-
-			incomingMedia.put(sender.getUid(), incoming);
+			log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, c.getUid());
+			incoming = createEndpoint(h, c);
+			incomingMedia.put(c.getUid(), incoming);
 		}
 
-		log.debug("PARTICIPANT {}: obtained endpoint for {}", this.uid, sender.getUid());
-		sender.getOutgoingWebRtcPeer().connect(incoming);
+		log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, c.getUid());
+		outgoingMedia.connect(incoming);
 
 		return incoming;
 	}
 
-	public void cancelVideoFrom(final KStream sender) {
-		this.cancelVideoFrom(sender.getUid());
-	}
-
-	public void cancelVideoFrom(final String senderName) {
-		log.debug("PARTICIPANT {}: canceling video reception from {}", this.uid, senderName);
-		final WebRtcEndpoint incoming = incomingMedia.remove(senderName);
-
-		log.debug("PARTICIPANT {}: removing endpoint for {}", this.uid, senderName);
-		incoming.release(new Continuation<Void>() {
-			@Override
-			public void onSuccess(Void result) throws Exception {
-				log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, senderName);
-			}
-
-			@Override
-			public void onError(Throwable cause) throws Exception {
-				log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, senderName);
-			}
-		});
-	}
-
 	@Override
 	public void release() {
-		log.debug("PARTICIPANT {}: Releasing resources", this.uid);
-		for (final String remoteParticipantName : incomingMedia.keySet()) {
-
-			log.trace("PARTICIPANT {}: Released incoming EP for {}", this.uid, remoteParticipantName);
+		log.debug("PARTICIPANT {}: Releasing resources", uid);
+		for (final String inUid : incomingMedia.keySet()) {
+			log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
 
-			final WebRtcEndpoint ep = this.incomingMedia.get(remoteParticipantName);
+			final WebRtcEndpoint ep = incomingMedia.get(inUid);
 			ep.release(new Continuation<Void>() {
 				@Override
 				public void onSuccess(Void result) throws Exception {
-					log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid,
-							remoteParticipantName);
+					log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
 				}
 
 				@Override
 				public void onError(Throwable cause) throws Exception {
-					log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid,
-							remoteParticipantName);
+					log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
 				}
 			});
 		}
-
 		outgoingMedia.release(new Continuation<Void>() {
-
 			@Override
 			public void onSuccess(Void result) throws Exception {
 				log.trace("PARTICIPANT {}: Released outgoing EP", KStream.this.uid);
@@ -228,6 +184,24 @@ public class KStream implements IKStream {
 		});
 	}
 
+	private WebRtcEndpoint createEndpoint(final KurentoHandler h, final Client c) {
+		WebRtcEndpoint endpoint = new WebRtcEndpoint.Builder(pipeline).build();
+		endpoint.addTag("suid", uid);
+		endpoint.addTag("uid", c.getUid());
+
+		endpoint.addIceCandidateFoundListener(new EventListener<IceCandidateFoundEvent>() {
+			@Override
+			public void onEvent(IceCandidateFoundEvent event) {
+				JSONObject response = newKurentoMsg();
+				response.put("id", "iceCandidate");
+				response.put("uid", uid);
+				response.put("candidate", convert(JsonUtils.toJsonObject(event.getCandidate())));
+				h.sendClient(c.getSid(), response);
+			}
+		});
+		return endpoint;
+	}
+
 	public void addCandidate(IceCandidate candidate, String name) {
 		if (this.uid.compareTo(name) == 0) {
 			outgoingMedia.addIceCandidate(candidate);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index 761186f..ab034ed 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -198,8 +198,7 @@ public class KTestStream implements IKStream {
 	}
 
 	private void sendPlayEnd(IWsClient _c) {
-		WebSocketHelper.sendClient(_c, newTestKurentoMsg()
-				.put("id", "playStopped"));
+		WebSocketHelper.sendClient(_c, newTestKurentoMsg().put("id", "playStopped"));
 		release();
 	}
 
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 627977a..050337a 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -19,13 +19,14 @@
  */
 package org.apache.openmeetings.core.remote;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.entity.basic.Client;
@@ -36,11 +37,15 @@ import org.apache.openmeetings.db.entity.room.Room.Right;
 import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
+import org.kurento.client.Endpoint;
 import org.kurento.client.EventListener;
 import org.kurento.client.IceCandidate;
 import org.kurento.client.KurentoClient;
+import org.kurento.client.MediaObject;
 import org.kurento.client.MediaPipeline;
 import org.kurento.client.ObjectCreatedEvent;
+import org.kurento.client.PlayerEndpoint;
+import org.kurento.client.RecorderEndpoint;
 import org.kurento.client.Tag;
 import org.kurento.client.WebRtcEndpoint;
 import org.slf4j.Logger;
@@ -56,6 +61,7 @@ public class KurentoHandler {
 	private final static String TAG_MODE = "mode";
 	private final static String TAG_ROOM = "roomId";
 	public final static String KURENTO_TYPE = "kurento";
+	private long checkTimeout = 200; //ms
 	private String kurentoWsUrl;
 	private KurentoClient client;
 	private String kuid;
@@ -83,38 +89,61 @@ public class KurentoHandler {
 						// room created
 						final String roid = evt.getObject().getId();
 						scheduler.schedule(() -> {
-							if (client != null) { // still alive
-								MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
-								try {
-									Map<String, String> tags = tagsAsMap(pipe);
-									if (kuid.equals(tags.get(TAG_KUID))) {
-										if (MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_MODE))) {
-											return;
-										}
-										KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
-										if (r.getPipelineId().equals(pipe.getId())) {
-											return;
-										} else if (r != null) {
-											rooms.remove(r.getRoomId());
-											r.close();
-										}
+							if (client == null) {
+								return;
+							}
+							// still alive
+							MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
+							try {
+								Map<String, String> tags = tagsAsMap(pipe);
+								if (validTestPipeline(tags)) {
+									return;
+								}
+								if (kuid.equals(tags.get(TAG_KUID))) {
+									KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
+									if (r.getPipelineId().equals(pipe.getId())) {
+										return;
+									} else if (r != null) {
+										rooms.remove(r.getRoomId());
+										r.close();
 									}
-								} catch(Exception e) {
-									//no-op, connect will be dropped
 								}
-								log.warn("Invalid MediaPipeline {} detected, will be dropped", pipe.getId());
-								pipe.release();
+							} catch(Exception e) {
+								//no-op, connect will be dropped
 							}
-						}, 2, TimeUnit.SECONDS);
-					} else if (evt.getObject() instanceof WebRtcEndpoint) { //FIXME TODO RecordingEndpoint
+							log.warn("Invalid MediaPipeline {} detected, will be dropped", pipe.getId());
+							pipe.release();
+						}, checkTimeout, MILLISECONDS);
+					} else if (evt.getObject() instanceof Endpoint) {
 						// endpoint created
-						final String eoid = evt.getObject().getId();
+						Endpoint _point = (Endpoint)evt.getObject();
+						final String eoid = _point.getId();
+						Class<? extends Endpoint> _clazz = null;
+						if (_point instanceof WebRtcEndpoint) {
+							_clazz = WebRtcEndpoint.class;
+						} else if (_point instanceof RecorderEndpoint) {
+							_clazz = RecorderEndpoint.class;
+						} else if (_point instanceof PlayerEndpoint) {
+							_clazz = PlayerEndpoint.class;
+						}
+						final Class<? extends Endpoint> clazz = _clazz;
 						scheduler.schedule(() -> {
-							if (client != null) { // still alive
-								WebRtcEndpoint point = client.getById(eoid, WebRtcEndpoint.class);
-								//point.release();
+							if (client == null || clazz == null) {
+								return;
 							}
-						}, 2, TimeUnit.SECONDS);
+							// still alive
+							Endpoint point = client.getById(eoid, clazz);
+							if (validTestPipeline(point.getMediaPipeline())) {
+								return;
+							}
+							Map<String, String> tags = tagsAsMap(point);
+							KStream stream = getByUid(tags.get("suid"));
+							if (stream != null && stream.contains(tags.get("uid"))) {
+								return;
+							}
+							log.warn("Invalid Endpoint {} detected, will be dropped", point.getId());
+							point.release();
+						}, checkTimeout, MILLISECONDS);
 					}
 				}
 			});
@@ -131,7 +160,7 @@ public class KurentoHandler {
 		}
 	}
 
-	private static Map<String, String> tagsAsMap(MediaPipeline pipe) {
+	private static Map<String, String> tagsAsMap(MediaObject pipe) {
 		Map<String, String> map = new HashMap<>();
 		for (Tag t : pipe.getTags()) {
 			map.put(t.getKey(), t.getValue());
@@ -139,6 +168,14 @@ public class KurentoHandler {
 		return map;
 	}
 
+	private boolean validTestPipeline(MediaPipeline pipeline) {
+		return validTestPipeline(tagsAsMap(pipeline));
+	}
+
+	private boolean validTestPipeline(Map<String, String> tags) {
+		return kuid.equals(tags.get(TAG_KUID)) && MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_MODE));
+	}
+
 	private MediaPipeline createTestPipeline() {
 		MediaPipeline pipe = client.createMediaPipeline();
 		pipe.addTag(TAG_KUID, kuid);
@@ -190,14 +227,14 @@ public class KurentoHandler {
 				case "toggleActivity":
 					toggleActivity(c, Client.Activity.valueOf(msg.getString("activity")));
 					break;
-				case "receiveVideoFrom":
-					final String senderUid = msg.getString("sender");
-					final KStream sender = getByUid(senderUid);
+				case "broadcastStarted":
+				{
 					final String sdpOffer = msg.getString("sdpOffer");
 					if (user == null) {
 						return;
 					}
-					user.receiveVideoFrom(this, c, sender, sdpOffer);
+					user.videoResponse(this, c, sdpOffer);
+				}
 					break;
 				case "onIceCandidate":
 				{
@@ -210,6 +247,17 @@ public class KurentoHandler {
 					user.addCandidate(cand, msg.getString("uid"));
 				}
 					break;
+				case "receiveVideo":
+				{
+					final String senderUid = msg.getString("sender");
+					KStream sender = getByUid(senderUid);
+					if (sender == null) {
+						return;
+					}
+					final String sdpOffer = msg.getString("sdpOffer");
+					sender.videoResponse(this, c, sdpOffer);
+				}
+					break;
 			}
 		}
 	}
@@ -245,7 +293,7 @@ public class KurentoHandler {
 				//close
 				leaveRoom(cm.update(c.removeStream(c.getUid())));
 				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
-				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.closeStream, c.getUid()));
+				//FIXME TODO update interview buttons
 			} else if (!broadcasting) {
 				//join
 				KRoom room = getRoom(c.getRoomId());
@@ -253,7 +301,8 @@ public class KurentoHandler {
 				sd.setWidth(c.getWidth());
 				sd.setHeight(c.getHeight());
 				cm.update(c.addStream(sd));
-				room.join(this, c, sd);
+				room.startBroadcast(this, c, sd);
+				//FIXME TODO update interview buttons
 			} else {
 				//change constraints
 				//FIXME TODO WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
@@ -263,9 +312,11 @@ public class KurentoHandler {
 
 	public void leaveRoom(Client c) {
 		remove(c);
-		WebSocketHelper.sendClient(c, newKurentoMsg()
+		WebSocketHelper.sendAll(newKurentoMsg()
 				.put("id", "broadcastStopped")
-				.put("uid", c.getUid()));
+				.put("uid", c.getUid())
+				.toString()
+			);
 	}
 
 	public void sendClient(String sid, JSONObject msg) {
@@ -364,4 +415,8 @@ public class KurentoHandler {
 		}
 		return r;
 	}
+
+	public void setCheckTimeout(long checkTimeout) {
+		this.checkTimeout = checkTimeout;
+	}
 }
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
index a0606b8..9d9fe55 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
@@ -36,6 +36,7 @@ import org.apache.openmeetings.core.util.ws.WsMessageAll;
 import org.apache.openmeetings.core.util.ws.WsMessageChat;
 import org.apache.openmeetings.core.util.ws.WsMessageRoom;
 import org.apache.openmeetings.core.util.ws.WsMessageRoomMsg;
+import org.apache.openmeetings.core.util.ws.WsMessageRoomOthers;
 import org.apache.openmeetings.core.util.ws.WsMessageUser;
 import org.apache.openmeetings.db.entity.basic.ChatMessage;
 import org.apache.openmeetings.db.entity.basic.Client;
@@ -155,6 +156,9 @@ public class WebSocketHelper {
 	public static void send(IClusterWsMessage _m) {
 		if (_m instanceof WsMessageRoomMsg) {
 			sendRoom(((WsMessageRoomMsg)_m).getMsg(), false);
+		} else if (_m instanceof WsMessageRoomOthers) {
+			WsMessageRoomOthers m = (WsMessageRoomOthers)_m;
+			sendRoomOthers(m.getRoomId(), m.getUid(), m.getMsg(), false);
 		} else if (_m instanceof WsMessageRoom) {
 			WsMessageRoom m = (WsMessageRoom)_m;
 			sendRoom(m.getRoomId(), m.getMsg(), false);
@@ -192,6 +196,17 @@ public class WebSocketHelper {
 		sendRoom(roomId, m, null, null);
 	}
 
+	public static void sendRoomOthers(final Long roomId, final String uid, final JSONObject m) {
+		sendRoomOthers(roomId, uid, m, true);
+	}
+
+	private static void sendRoomOthers(final Long roomId, final String uid, final JSONObject m, boolean publish) {
+		if (publish) {
+			publish(new WsMessageRoomOthers(roomId, uid, m));
+		}
+		sendRoom(roomId, m, c -> !uid.equals(c.getUid()), null);
+	}
+
 	public static void sendRoom(ChatMessage m, JSONObject msg) {
 		sendRoom(m, msg, true);
 	}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomOthers.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomOthers.java
new file mode 100644
index 0000000..df93d2a
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/ws/WsMessageRoomOthers.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.util.ws;
+
+import com.github.openjson.JSONObject;
+
+public class WsMessageRoomOthers extends WsMessageRoom {
+	private static final long serialVersionUID = 1L;
+	private final String uid;
+
+	public WsMessageRoomOthers(Long roomId, String uid, JSONObject msg) {
+		super(roomId, msg);
+		this.uid = uid;
+	}
+
+	public String getUid() {
+		return uid;
+	}
+}
diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
index dbccc8f..b3b05cc 100644
--- a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
+++ b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
@@ -52,8 +52,6 @@ public class RoomMessage implements IWebSocketPushMessage {
 		, requestRightExclusive
 		, haveQuestion
 		, kick
-		, newStream
-		, closeStream
 		, mute
 		, exclusive
 		, quickPollUpdated
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 3de4273..fc9f610 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -167,7 +167,7 @@ public class RoomPanel extends BasePanel {
 			for (Client c: cm.listByRoom(getRoom().getId())) {
 				for (StreamDesc sd : c.getStreams()) {
 					JSONObject jo = videoJson(c, c.getSid(), sd);
-					sb.append(String.format("VideoManager.play(%s);", jo));
+					//FIXME TODO sb.append(String.format("VideoManager.play(%s);", jo));
 					hasStreams = true;
 				}
 			}
@@ -492,47 +492,6 @@ public class RoomPanel extends BasePanel {
 							updateInterviewRecordingButtons(handler);
 						}
 						break;
-					case newStream:
-					{
-						/* FIXME TODO
-						JSONObject obj = new JSONObject(((TextRoomMessage)m).getText());
-						String uid = obj.getString("uid");
-						Client c = cm.get(uid);
-						if (c == null) {
-							// screen client, ext video stream
-							c = cm.getBySid(obj.getString("sid"));
-						}
-						if (c == null) {
-							log.error("Not existing user in newStream {} !!!!", uid);
-							return;
-						}
-						boolean self = _c.getSid().equals(c.getSid());
-						if (!self || Client.Type.room != scm.get(uid).getType()) { // stream from others or self external video
-							JSONObject jo = videoJson(c, _c.getSid(), uid);
-							handler.appendJavaScript(String.format("VideoManager.play(%s);", jo));
-						}
-						if (self) {
-							cm.update(c.addStream(uid));
-						}*/
-						updateInterviewRecordingButtons(handler);
-					}
-						break;
-					case closeStream:
-					{
-						/* FIXME TODO
-						JSONObject obj = new JSONObject(((TextRoomMessage)m).getText());
-						String uid = obj.getString("uid");
-						Client c = cm.getBySid(obj.getString("sid"));
-						if (c != null) {
-							//c == null means client exits the room
-							if (_c.getUid().equals(c.getUid())) {
-								cm.update(c.removeStream(uid));
-							}
-						}
-						handler.appendJavaScript(String.format("VideoManager.close('%s');", uid));*/
-						updateInterviewRecordingButtons(handler);
-					}
-						break;
 					case roomEnter:
 						sidebar.update(handler);
 						menu.update(handler);
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
index 9439cbd..e549956 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
@@ -44,26 +44,49 @@ var VideoManager = (function() {
 				if (error) {
 					return OmUtil.error(error);
 				}
-				this.generateOffer(v.offerToReceiveVideo);
+				this.generateOffer(function(error, offerSdp, wp) {
+					if (error) {
+						return OmUtil.error('Sender sdp offer error');
+					}
+					OmUtil.log('Invoking Sender SDP offer callback function');
+					VideoManager.sendMessage({
+						id : 'broadcastStarted'
+						, sdpOffer: offerSdp
+					});
+				});
 			}));
 	}
 
-	function _receiveVideo(uid) {
-		const w = $('#' + VideoUtil.getVid(uid))
+	function _onReceive(msg) {
+		const uid = msg.client.uid;
+		$('#' + VideoUtil.getVid(uid)).remove();
+		const o = VideoSettings.load() //FIXME TODO add multiple streams support
+			//, w = Video().init(msg.client, VideoUtil.getPos(VideoUtil.getRects(VID_SEL), msg.stream.width, msg.stream.height + 25))
+			, w = Video().init(msg.client, VideoUtil.getPos(VideoUtil.getRects(VID_SEL), msg.client.width, msg.client.height + 25))
 			, v = w.data()
 			, cl = v.client();
-		v.setPeer(new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(
-			{
-				remoteVideo: v.video()
-				, onicecandidate: v.onIceCandidate
+		OmUtil.log(uid + " receiving video");
+
+		v.setPeer(new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly({
+				remoteVideo : v.video()
+				, onicecandidate : v.onIceCandidate
 			}
-			, function (error) {
-				if(error) {
+			, function(error) {
+				if (error) {
 					return OmUtil.error(error);
 				}
-				this.generateOffer(v.offerToReceiveVideo);
-			}
-		));
+				this.generateOffer(function onOfferViewer(error, offerSdp) {
+					if (error) {
+						return OmUtil.error('Receiver sdp offer error');
+					}
+					OmUtil.log('Invoking Receiver SDP offer callback function');
+					VideoManager.sendMessage({
+						id : 'receiveVideo'
+						, sender: cl.uid
+						, sdpOffer: offerSdp
+					});
+				});
+			}));
 	}
 
 	function _onWsMessage(jqEvent, msg) {
@@ -100,6 +123,9 @@ var VideoManager = (function() {
 							});
 						}
 						break;
+					case 'newStream':
+						_onReceive(m);
+						break;
 					default:
 						//no-op
 				}
@@ -194,9 +220,6 @@ var VideoManager = (function() {
 					v.dialog('open');
 				}
 			});
-		} else if ('sharing' !== c.type) {
-			Video().init(c, VideoUtil.getPos(VideoUtil.getRects(VID_SEL), c.width, c.height + 25));
-			_receiveVideo(c.uid);
 		}
 	}
 	function _close(uid, showShareBtn) {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
index 6f590d4..f30275d 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
@@ -333,17 +333,6 @@ var Video = (function() {
 			, uid: c.uid
 		});
 	};
-	self.offerToReceiveVideo = function(error, offerSdp, wp) {
-		if (error) {
-			return OmUtil.error("sdp offer error");
-		}
-		OmUtil.log('Invoking SDP offer callback function');
-		VideoManager.sendMessage({
-			id : "receiveVideoFrom"
-			, sender: c.uid
-			, sdpOffer: offerSdp
-		});
-	}
 	self.resizePod = _resizePod;
 	return self;
 });
diff --git a/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml b/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml
index 2248d18..12cbf71 100644
--- a/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml
+++ b/openmeetings-web/src/main/webapp/WEB-INF/classes/applicationContext.xml
@@ -139,7 +139,6 @@
 	</bean>
 
 	<!-- Kurento -->
-	<bean id="kurentoHandler" class="org.apache.openmeetings.core.remote.KurentoHandler" init-method="init" destroy-method="destroy">
-		<property name="kurentoWsUrl" value="wss://192.168.15.177:8883/kurento"/>
-	</bean>
+	<bean id="kurentoHandler" class="org.apache.openmeetings.core.remote.KurentoHandler" init-method="init" destroy-method="destroy"
+			p:kurentoWsUrl="ws://localhost:8888/kurento" p:checkTimeout="200"/>
 </beans>