You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2020/10/23 07:39:29 UTC

[openmeetings] branch master updated: [OPENMEETINGS-2492] pipeline is created per KStream

This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 693bf74  [OPENMEETINGS-2492] pipeline is created per KStream
693bf74 is described below

commit 693bf7496a9c6e1dbcc5ba32e736dc09b1c9f321
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Fri Oct 23 14:39:12 2020 +0700

    [OPENMEETINGS-2492] pipeline is created per KStream
---
 .../org/apache/openmeetings/core/remote/KRoom.java | 30 +-------
 .../apache/openmeetings/core/remote/KStream.java   | 69 ++++++++++++++---
 .../openmeetings/core/remote/KurentoHandler.java   | 88 +++++++++++++---------
 .../openmeetings/core/remote/StreamProcessor.java  | 25 +++---
 .../apache/openmeetings/core/sip/SipManager.java   |  2 +-
 .../core/remote/TestRecordingFlowMocked.java       |  4 +-
 .../apache/openmeetings/web/app/ClientManager.java |  8 +-
 .../apache/openmeetings/web/app/TimerService.java  |  2 +-
 .../apache/openmeetings/web/room/RoomPanel.java    |  6 +-
 .../org/apache/openmeetings/web/room/raw-video.js  | 33 ++++----
 .../openmeetings/web/app/TestApplication.java      |  2 +-
 11 files changed, 154 insertions(+), 115 deletions(-)

diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index c7a8709..8215b37 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -25,7 +25,6 @@ import static java.util.UUID.randomUUID;
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 
-import java.util.Collection;
 import java.util.Date;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,8 +45,6 @@ import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.FormatHelper;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
-import org.kurento.client.Continuation;
-import org.kurento.client.MediaPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +64,6 @@ public class KRoom {
 	private final StreamProcessor processor;
 	private final RecordingChunkDao chunkDao;
 	private final IApplication app;
-	private final MediaPipeline pipeline;
 	private final Long roomId;
 	private final Room.Type type;
 	private final AtomicBoolean recordingStarted = new AtomicBoolean(false);
@@ -76,13 +72,12 @@ public class KRoom {
 	private JSONObject recordingUser = new JSONObject();
 	private JSONObject sharingUser = new JSONObject();
 
-	public KRoom(KurentoHandler handler, Room r, MediaPipeline pipeline) {
+	public KRoom(KurentoHandler handler, Room r) {
 		this.processor = handler.getStreamProcessor();
 		this.chunkDao = handler.getChunkDao();
 		this.app = handler.getApp();
 		this.roomId = r.getId();
 		this.type = r.getType();
-		this.pipeline = pipeline;
 		log.info("ROOM {} has been created", roomId);
 	}
 
@@ -98,25 +93,17 @@ public class KRoom {
 		return recordingId;
 	}
 
-	public MediaPipeline getPipeline() {
-		return pipeline;
-	}
-
 	public RecordingChunkDao getChunkDao() {
 		return chunkDao;
 	}
 
-	public KStream join(final StreamDesc sd) {
+	public KStream join(final StreamDesc sd, KurentoHandler kHandler) {
 		log.info("ROOM {}: join client {}, stream: {}", roomId, sd.getClient(), sd.getUid());
-		final KStream stream = new KStream(sd, this);
+		final KStream stream = new KStream(sd, this, kHandler);
 		processor.addStream(stream);
 		return stream;
 	}
 
-	public Collection<KStream> getParticipants() {
-		return processor.getByRoom(this.getRoomId());
-	}
-
 	public void onStopBroadcast(KStream stream) {
 		processor.release(stream, true);
 		WebSocketHelper.sendAll(newKurentoMsg()
@@ -269,17 +256,6 @@ public class KRoom {
 		processor.getByRoom(this.getRoomId()).forEach(
 				stream -> stream.release(processor)
 		);
-		pipeline.release(new Continuation<Void>() {
-			@Override
-			public void onSuccess(Void result) throws Exception {
-				log.trace("ROOM {}: Released Pipeline", KRoom.this.roomId);
-			}
-
-			@Override
-			public void onError(Throwable cause) throws Exception {
-				log.warn("PARTICIPANT {}: Could not release Pipeline", KRoom.this.roomId);
-			}
-		});
 		log.debug("Room {} closed", this.roomId);
 	}
 }
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 754ee15..930d4ea 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -49,6 +49,7 @@ import org.apache.openmeetings.db.util.ws.TextRoomMessage;
 import org.kurento.client.Continuation;
 import org.kurento.client.IceCandidate;
 import org.kurento.client.MediaFlowState;
+import org.kurento.client.MediaPipeline;
 import org.kurento.client.MediaProfileSpecType;
 import org.kurento.client.MediaType;
 import org.kurento.client.RecorderEndpoint;
@@ -62,38 +63,49 @@ import com.github.openjson.JSONObject;
 public class KStream extends AbstractStream {
 	private static final Logger log = LoggerFactory.getLogger(KStream.class);
 
+	private final KurentoHandler kHandler;
 	private final KRoom room;
 	private final Date connectedSince;
 	private final StreamType streamType;
 	private MediaProfileSpecType profile;
+	private MediaPipeline pipeline;
 	private RecorderEndpoint recorder;
 	private WebRtcEndpoint outgoingMedia = null;
 	private final ConcurrentMap<String, WebRtcEndpoint> listeners = new ConcurrentHashMap<>();
 	private Optional<CompletableFuture<Object>> flowoutFuture = Optional.empty();
 	private Long chunkId;
 	private Type type;
+	private boolean hasAudio;
+	private boolean hasVideo;
+	private boolean hasScreen;
 
-	public KStream(final StreamDesc sd, KRoom room) {
+	public KStream(final StreamDesc sd, KRoom room, KurentoHandler kHandler) {
 		super(sd.getSid(), sd.getUid());
 		this.room = room;
 		streamType = sd.getType();
 		this.connectedSince = new Date();
+		this.kHandler = kHandler;
 		//TODO Min/MaxVideoSendBandwidth
 		//TODO Min/Max Audio/Video RecvBandwidth
 	}
 
-	public KStream startBroadcast(final StreamProcessor processor, final StreamDesc sd, final String sdpOffer) {
+	public void startBroadcast(
+			final StreamProcessor processor
+			, final StreamDesc sd
+			, final String sdpOffer
+			, Runnable then)
+	{
 		if (outgoingMedia != null) {
 			release(processor, false);
 		}
-		final boolean hasAudio = sd.hasActivity(Activity.AUDIO);
-		final boolean hasVideo = sd.hasActivity(Activity.VIDEO);
-		final boolean hasScreen = sd.hasActivity(Activity.SCREEN);
+		hasAudio = sd.hasActivity(Activity.AUDIO);
+		hasVideo = sd.hasActivity(Activity.VIDEO);
+		hasScreen = sd.hasActivity(Activity.SCREEN);
 		if ((sdpOffer.indexOf("m=audio") > -1 && !hasAudio)
 				|| (sdpOffer.indexOf("m=video") > -1 && !hasVideo && StreamType.SCREEN != streamType))
 		{
 			log.warn("Broadcast started without enough rights");
-			return this;
+			return;
 		}
 		if (StreamType.SCREEN == streamType) {
 			type = Type.SCREEN;
@@ -119,6 +131,25 @@ public class KStream extends AbstractStream {
 				profile = MediaProfileSpecType.WEBM_VIDEO_ONLY;
 				break;
 		}
+		pipeline = kHandler.createPipiline(room.getRoomId(), sd.getUid(), new Continuation<Void>() {
+			@Override
+			public void onSuccess(Void result) throws Exception {
+				internalStartBroadcast(processor, sd, sdpOffer);
+				then.run();
+			}
+
+			@Override
+			public void onError(Throwable cause) throws Exception {
+				log.warn("Unable to create pipeline {}", KStream.this.uid, cause);
+			}
+		});
+	}
+
+	private void internalStartBroadcast(
+			final StreamProcessor processor
+			, final StreamDesc sd
+			, final String sdpOffer)
+	{
 		outgoingMedia = createEndpoint(processor, sd.getSid(), sd.getUid());
 		outgoingMedia.addMediaSessionTerminatedListener(evt -> log.warn("Media stream terminated {}", sd));
 		outgoingMedia.addMediaFlowOutStateChangeListener(evt -> {
@@ -151,10 +182,9 @@ public class KStream extends AbstractStream {
 		if (hasAudio || hasVideo || hasScreen) {
 			WebSocketHelper.sendRoomOthers(room.getRoomId(), c.getUid(), newKurentoMsg()
 					.put("id", "newStream")
-					.put(PARAM_ICE, processor.getHandler().getTurnServers(c))
+					.put(PARAM_ICE, kHandler.getTurnServers(c))
 					.put("stream", sd.toJson()));
 		}
-		return this;
 	}
 
 	public void addListener(final StreamProcessor processor, String sid, String uid, String sdpOffer) {
@@ -172,7 +202,7 @@ public class KStream extends AbstractStream {
 		log.debug("gather candidates");
 		endpoint.gatherCandidates(); // this one might throw Exception
 		log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
-		processor.getHandler().sendClient(sid, newKurentoMsg()
+		kHandler.sendClient(sid, newKurentoMsg()
 				.put("id", "videoResponse")
 				.put("uid", this.uid)
 				.put("sdpAnswer", sdpAnswer));
@@ -215,11 +245,11 @@ public class KStream extends AbstractStream {
 	}
 
 	private WebRtcEndpoint createEndpoint(final StreamProcessor processor, String sid, String uid) {
-		WebRtcEndpoint endpoint = createWebRtcEndpoint(room.getPipeline());
+		WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline);
 		endpoint.addTag("outUid", this.uid);
 		endpoint.addTag("uid", uid);
 
-		endpoint.addIceCandidateFoundListener(evt -> processor.getHandler().sendClient(sid
+		endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid
 				, newKurentoMsg()
 					.put("id", "iceCandidate")
 					.put("uid", KStream.this.uid)
@@ -235,7 +265,7 @@ public class KStream extends AbstractStream {
 			return;
 		}
 		final String chunkUid = "rec_" + room.getRecordingId() + "_" + randomUUID();
-		recorder = createRecorderEndpoint(room.getPipeline(), getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile);
+		recorder = createRecorderEndpoint(pipeline, getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile);
 		recorder.addTag("outUid", uid);
 		recorder.addTag("uid", uid);
 
@@ -335,6 +365,17 @@ public class KStream extends AbstractStream {
 					log.warn("PARTICIPANT {}: Could not release", KStream.this.uid, cause);
 				}
 			});
+			pipeline.release(new Continuation<Void>() {
+				@Override
+				public void onSuccess(Void result) throws Exception {
+					log.trace("PARTICIPANT {}: Released Pipeline", KStream.this.uid);
+				}
+
+				@Override
+				public void onError(Throwable cause) throws Exception {
+					log.warn("PARTICIPANT {}: Could not release Pipeline", KStream.this.uid, cause);
+				}
+			});
 			releaseRecorder(false);
 			outgoingMedia = null;
 		}
@@ -423,6 +464,10 @@ public class KStream extends AbstractStream {
 		return room;
 	}
 
+	MediaPipeline getPipeline() {
+		return pipeline;
+	}
+
 	public StreamType getStreamType() {
 		return streamType;
 	}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 5f1214e..a43f62c 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -56,6 +56,7 @@ import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
 import org.apache.wicket.util.string.Strings;
+import org.kurento.client.Continuation;
 import org.kurento.client.Endpoint;
 import org.kurento.client.EventListener;
 import org.kurento.client.KurentoClient;
@@ -87,6 +88,7 @@ public class KurentoHandler {
 	public static final String TAG_KUID = "kuid";
 	public static final String TAG_MODE = "mode";
 	public static final String TAG_ROOM = "roomId";
+	public static final String TAG_STREAM_UID = "streamUid";
 	private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
 	private final ScheduledExecutorService kmsRecheckScheduler = Executors.newScheduledThreadPool(1);
 	public static final String KURENTO_TYPE = "kurento";
@@ -296,19 +298,22 @@ public class KurentoHandler {
 		streamProcessor.remove((Client)c);
 	}
 
+	MediaPipeline createPipiline(Long roomId, String uid, Continuation<Void> continuation) {
+		Transaction t = beginTransaction();
+		MediaPipeline pipe = client.createMediaPipeline(t);
+		pipe.addTag(t, TAG_KUID, kuid);
+		pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
+		pipe.addTag(t, TAG_STREAM_UID, uid);
+		t.commit(continuation);
+		return pipe;
+	}
+
 	KRoom getRoom(Long roomId) {
-		log.debug("Searching for room {}", roomId);
 		KRoom room = rooms.computeIfAbsent(roomId, k -> {
 			log.debug("Room {} does not exist. Will create now!", roomId);
 			Room r = roomDao.get(roomId);
-			Transaction t = beginTransaction();
-			MediaPipeline pipe = client.createMediaPipeline(t);
-			pipe.addTag(t, TAG_KUID, kuid);
-			pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
-			t.commit();
-			return new KRoom(this, r, pipe);
+			return new KRoom(this, r);
 		});
-		log.debug("Room {} found!", roomId);
 		return room;
 	}
 
@@ -440,24 +445,30 @@ public class KurentoHandler {
 					// still alive
 					MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
 					Map<String, String> tags = tagsAsMap(pipe);
-					final String inKuid = tags.get(TAG_KUID);
-					if (ignoredKuids.contains(inKuid)) {
-						return;
-					}
-					if (validTestPipeline(tags)) {
-						return;
-					}
-					if (kuid.equals(inKuid)) {
-						KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
-						if (r.getPipeline().getId().equals(pipe.getId())) {
+					try {
+						final String inKuid = tags.get(TAG_KUID);
+						if (inKuid != null && ignoredKuids.contains(inKuid)) {
 							return;
-						} else if (r != null) {
-							rooms.remove(r.getRoomId());
-							r.close();
 						}
+						if (validTestPipeline(tags)) {
+							return;
+						}
+						if (kuid.equals(inKuid)) {
+							KStream stream = streamProcessor.getByUid(tags.get(TAG_STREAM_UID));
+							if (stream != null) {
+								if (stream.getRoom().getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM)))
+										&& stream.getPipeline().getId().equals(pipe.getId()))
+								{
+									return;
+								} else {
+									stream.release(streamProcessor);
+								}
+							}
+						}
+					} catch (Throwable e) {
+						log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
+						pipe.release();
 					}
-					log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
-					pipe.release();
 				}, objCheckTimeout, MILLISECONDS);
 			} else if (evt.getObject() instanceof Endpoint) {
 				// endpoint created
@@ -478,22 +489,25 @@ public class KurentoHandler {
 					}
 					// still alive
 					Endpoint point = client.getById(eoid, fClazz);
-					Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline());
-					final String inKuid = pipeTags.get(TAG_KUID);
-					if (ignoredKuids.contains(inKuid)) {
-						return;
-					}
-					if (validTestPipeline(pipeTags)) {
-						return;
-					}
 					Map<String, String> tags = tagsAsMap(point);
-					KStream stream = streamProcessor.getByUid(tags.get("outUid"));
-					log.debug("New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream);
-					if (stream != null && stream.contains(tags.get("uid"))) {
-						return;
+					try {
+						Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline());
+						final String inKuid = pipeTags.get(TAG_KUID);
+						if (ignoredKuids.contains(inKuid)) {
+							return;
+						}
+						if (validTestPipeline(pipeTags)) {
+							return;
+						}
+						KStream stream = streamProcessor.getByUid(tags.get("outUid"));
+						log.debug("Kurento::ObjectCreated -> New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream);
+						if (stream != null && stream.contains(tags.get("uid"))) {
+							return;
+						}
+					} catch (Throwable e) {
+						log.warn("Kurento::ObjectCreated -> Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags);
+						point.release();
 					}
-					log.warn("Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags);
-					point.release();
 				}, objCheckTimeout, MILLISECONDS);
 			}
 		}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 27fbe41..5e4635a 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.openmeetings.core.converter.IRecordingConverter;
 import org.apache.openmeetings.core.converter.InterviewConverter;
@@ -182,12 +182,13 @@ public class StreamProcessor implements IStreamProcessor {
 		try {
 			if (sender == null) {
 				KRoom room = kHandler.getRoom(c.getRoomId());
-				sender = room.join(sd);
-			}
-			startBroadcast(sender, sd, msg.getString("sdpOffer"));
-			if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
-				startRecording(c);
+				sender = room.join(sd, kHandler);
 			}
+			startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> {
+				if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
+					startRecording(c);
+				}
+			});
 		} catch (KurentoServerException e) {
 			sender.release(this);
 			WebSocketHelper.sendClient(c, newStoppedMsg(sd));
@@ -203,10 +204,11 @@ public class StreamProcessor implements IStreamProcessor {
 	 * @param stream Stream to start
 	 * @param sd StreamDesc to start
 	 * @param sdpOffer the sdpOffer
+	 * @param then steps need to be done after broadcast is started
 	 * @return the current KStream
 	 */
-	KStream startBroadcast(KStream stream, StreamDesc sd, String sdpOffer) {
-		return stream.startBroadcast(this, sd, sdpOffer);
+	void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) {
+		stream.startBroadcast(this, sd, sdpOffer, then);
 	}
 
 	private static boolean isBroadcasting(final Client c) {
@@ -501,7 +503,7 @@ public class StreamProcessor implements IStreamProcessor {
 			}
 		}
 		if (c.getRoomId() != null) {
-			getByRoom(c.getRoomId()).stream().forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up
+			getByRoom(c.getRoomId()).forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up
 			checkStreams(c.getRoomId());
 		}
 	}
@@ -514,10 +516,9 @@ public class StreamProcessor implements IStreamProcessor {
 		return streamByUid.values();
 	}
 
-	Collection<KStream> getByRoom(Long roomId) {
+	Stream<KStream> getByRoom(Long roomId) {
 		return streamByUid.values().stream()
-				.filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId))
-				.collect(Collectors.toList());
+				.filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId));
 	}
 
 	Client getBySid(String sid) {
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
index 4e0f991..715718a 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
@@ -280,7 +280,7 @@ public class SipManager implements ISipManager, SipListenerExt {
 		ConfbridgeListAction da = new ConfbridgeListAction(confno);
 		ResponseEvents r = execEvent(da);
 		if (r != null) {
-			log.debug("SipManager::countUsers size == {}", r.getEvents().size());
+			log.trace("SipManager::countUsers size == {}", r.getEvents().size());
 			// "- 1" here means: ListComplete event
 			return r.getEvents().size() - 1;
 		}
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
index cf24bc3..c5d5b1e 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
@@ -91,7 +91,7 @@ class TestRecordingFlowMocked extends BaseMockedTest {
 		doReturn(c.getRoom()).when(roomDao).get(ROOM_ID);
 
 		// Mock out the methods that do webRTC
-		doReturn(null).when(streamProcessor).startBroadcast(any(), any(), any());
+		doReturn(null).when(streamProcessor).startBroadcast(any(), any(), any(), any());
 
 	}
 
@@ -175,7 +175,7 @@ class TestRecordingFlowMocked extends BaseMockedTest {
 		assertTrue(streamProcessor.isSharing(ROOM_ID));
 
 		//verify startBroadcast has been invoked
-		verify(streamProcessor).startBroadcast(any(), any(), any());
+		verify(streamProcessor).startBroadcast(any(), any(), any(), any());
 
 		// Assert that there is still just 1 stream and has only the activities to Record assigned
 		assertEquals(1, c.getStreams().size());
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
index 4dfedcc..e4f38f0 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
@@ -313,7 +313,7 @@ public class ClientManager implements IClientManager {
 			.map(id -> onlineRooms.getOrDefault(id, Set.of()))
 			.stream()
 			.flatMap(Set::stream)
-			.map(uid -> get(uid))
+			.map(this::get)
 			.filter(Objects::nonNull);
 	}
 
@@ -322,10 +322,8 @@ public class ClientManager implements IClientManager {
 			.map(id -> onlineRooms.getOrDefault(id, Set.of()))
 			.stream()
 			.flatMap(Set::stream)
-			.map(uid -> get(uid))
-			.filter(c -> c != null && c.sameUserId(userId))
-			.findAny()
-			.isPresent();
+			.map(this::get)
+			.anyMatch(c -> c != null && c.sameUserId(userId));
 	}
 
 	private List<Client> getByKeys(Long userId, String sessionId) {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
index 799b5a1..2a2800b 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
@@ -82,7 +82,7 @@ public class TimerService {
 		sipCheckMap.put(
 				roomId
 				, new CompletableFuture<>().completeAsync(() -> {
-					log.warn("Sip room check {}", roomId);
+					log.trace("Sip room check {}", roomId);
 					Optional<Client> sipClient = cm.streamByRoom(roomId).filter(Client::isSip).findAny();
 					cm.streamByRoom(roomId).filter(Predicate.not(Client::isSip)).findAny().ifPresentOrElse(c -> {
 						updateSipLastName(sipClient, c.getRoom());
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 4a4ae17..e0f9780 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -619,8 +619,7 @@ public class RoomPanel extends BasePanel {
 				handler.appendJavaScript("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(true);}");
 			} else if (streamProcessor.recordingAllowed(getClient())) {
 				boolean hasStreams = cm.streamByRoom(r.getId())
-						.filter(cl -> !cl.getStreams().isEmpty())
-						.findAny().isPresent();
+						.anyMatch(cl -> !cl.getStreams().isEmpty());
 				handler.appendJavaScript(String.format("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(false);WbArea.setRecEnabled(%s);}", hasStreams));
 			}
 		}
@@ -636,8 +635,7 @@ public class RoomPanel extends BasePanel {
 
 	public static boolean hasRight(ClientManager cm, long userId, long roomId, Right r) {
 		return cm.streamByRoom(roomId)
-				.filter(c -> c.sameUserId(userId) && c.hasRight(r))
-				.findAny().isPresent();
+				.anyMatch(c -> c.sameUserId(userId) && c.hasRight(r));
 	}
 
 	@Override
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
index db083a4..215b114 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
@@ -6,6 +6,10 @@ var Video = (function() {
 		, lm, level, userSpeaks = false, muteOthers
 		, hasVideo, isSharing, isRecording;
 
+	function __getVideo(_state) {
+		const vid = self.video(_state);
+		return vid && vid.length > 0 ? vid[0] : null;
+	}
 	function _resizeDlgArea(_w, _h) {
 		if (Room.getOptions().interview) {
 			VideoUtil.setPos(v, VideoUtil.getPos());
@@ -153,7 +157,7 @@ var Video = (function() {
 			, onicecandidate: self.onIceCandidate
 		};
 		if (!isSharing) {
-			state.options.localVideo = state.video[0];
+			state.options.localVideo = __getVideo(state);
 		}
 		const data = state.data;
 		data.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(
@@ -203,7 +207,7 @@ var Video = (function() {
 	function _createResvPeer(msg, state) {
 		__createVideo(state);
 		const options = VideoUtil.addIceServers({
-			remoteVideo : state.video[0]
+			remoteVideo : __getVideo(state)
 			, onicecandidate : self.onIceCandidate
 		}, msg);
 		const data = state.data;
@@ -418,8 +422,9 @@ var Video = (function() {
 		}
 		state.video = $(hasVideo ? '<video>' : '<audio>').attr('id', 'vid' + _id)
 			.attr('playsinline', 'playsinline')
-			.width(vc.width()).height(vc.height())
-			.prop('autoplay', true).prop('controls', false);
+			//.attr('autoplay', 'autoplay')
+			.prop('controls', false)
+			.width(vc.width()).height(vc.height());
 		if (state.data) {
 			state.video.data(state.data);
 		}
@@ -568,20 +573,22 @@ var Video = (function() {
 			return;
 		}
 		state.data.rtcPeer.processAnswer(answer, function (error) {
-			if (true === this.cleaned || this.peerConnection.signalingState === 'stable') {
+			if (true === this.cleaned) {
 				return;
 			}
-			if (error) {
-				return OmUtil.error(error);
-			}
-			if (state.video && state.video.paused) {
-				state.video.play().catch(function (err) {
+			const video = __getVideo(state);
+			if (this.peerConnection.signalingState === 'stable' && video && video.paused) {
+				video.play().catch(function (err) {
 					if ('NotAllowedError' === err.name) {
 						VideoUtil.askPermission(function () {
-							state.video.play();
+							video.play();
 						});
 					}
 				});
+				return;
+			}
+			if (error) {
+				return OmUtil.error(error);
 			}
 		});
 	}
@@ -625,8 +632,8 @@ var Video = (function() {
 		});
 	};
 	self.reattachStream = _reattachStream;
-	self.video = function() {
-		const state = states.length > 0 ? states[0] : null;
+	self.video = function(_state) {
+		const state = _state || (states.length > 0 ? states[0] : null);
 		if (!state || state.disposed) {
 			return null;
 		}
diff --git a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
index 415d78c..985d45b 100644
--- a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
+++ b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
@@ -33,7 +33,7 @@ import org.apache.openmeetings.AbstractJUnitDefaults;
 import org.apache.openmeetings.db.dao.label.LabelDao;
 import org.junit.jupiter.api.Test;
 
-public class TestApplication extends AbstractJUnitDefaults {
+class TestApplication extends AbstractJUnitDefaults {
 	@Test
 	void testMissing() {
 		assertEquals("[Missing]", app.getOmString("909", Locale.ENGLISH));