You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2020/10/23 07:39:29 UTC
[openmeetings] branch master updated: [OPENMEETINGS-2492] pipeline
is created per KStream
This is an automated email from the ASF dual-hosted git repository.
solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push:
new 693bf74 [OPENMEETINGS-2492] pipeline is created per KStream
693bf74 is described below
commit 693bf7496a9c6e1dbcc5ba32e736dc09b1c9f321
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Fri Oct 23 14:39:12 2020 +0700
[OPENMEETINGS-2492] pipeline is created per KStream
---
.../org/apache/openmeetings/core/remote/KRoom.java | 30 +-------
.../apache/openmeetings/core/remote/KStream.java | 69 ++++++++++++++---
.../openmeetings/core/remote/KurentoHandler.java | 88 +++++++++++++---------
.../openmeetings/core/remote/StreamProcessor.java | 25 +++---
.../apache/openmeetings/core/sip/SipManager.java | 2 +-
.../core/remote/TestRecordingFlowMocked.java | 4 +-
.../apache/openmeetings/web/app/ClientManager.java | 8 +-
.../apache/openmeetings/web/app/TimerService.java | 2 +-
.../apache/openmeetings/web/room/RoomPanel.java | 6 +-
.../org/apache/openmeetings/web/room/raw-video.js | 33 ++++----
.../openmeetings/web/app/TestApplication.java | 2 +-
11 files changed, 154 insertions(+), 115 deletions(-)
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index c7a8709..8215b37 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -25,7 +25,6 @@ import static java.util.UUID.randomUUID;
import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
-import java.util.Collection;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,8 +45,6 @@ import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.FormatHelper;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
-import org.kurento.client.Continuation;
-import org.kurento.client.MediaPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +64,6 @@ public class KRoom {
private final StreamProcessor processor;
private final RecordingChunkDao chunkDao;
private final IApplication app;
- private final MediaPipeline pipeline;
private final Long roomId;
private final Room.Type type;
private final AtomicBoolean recordingStarted = new AtomicBoolean(false);
@@ -76,13 +72,12 @@ public class KRoom {
private JSONObject recordingUser = new JSONObject();
private JSONObject sharingUser = new JSONObject();
- public KRoom(KurentoHandler handler, Room r, MediaPipeline pipeline) {
+ public KRoom(KurentoHandler handler, Room r) {
this.processor = handler.getStreamProcessor();
this.chunkDao = handler.getChunkDao();
this.app = handler.getApp();
this.roomId = r.getId();
this.type = r.getType();
- this.pipeline = pipeline;
log.info("ROOM {} has been created", roomId);
}
@@ -98,25 +93,17 @@ public class KRoom {
return recordingId;
}
- public MediaPipeline getPipeline() {
- return pipeline;
- }
-
public RecordingChunkDao getChunkDao() {
return chunkDao;
}
- public KStream join(final StreamDesc sd) {
+ public KStream join(final StreamDesc sd, KurentoHandler kHandler) {
log.info("ROOM {}: join client {}, stream: {}", roomId, sd.getClient(), sd.getUid());
- final KStream stream = new KStream(sd, this);
+ final KStream stream = new KStream(sd, this, kHandler);
processor.addStream(stream);
return stream;
}
- public Collection<KStream> getParticipants() {
- return processor.getByRoom(this.getRoomId());
- }
-
public void onStopBroadcast(KStream stream) {
processor.release(stream, true);
WebSocketHelper.sendAll(newKurentoMsg()
@@ -269,17 +256,6 @@ public class KRoom {
processor.getByRoom(this.getRoomId()).forEach(
stream -> stream.release(processor)
);
- pipeline.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("ROOM {}: Released Pipeline", KRoom.this.roomId);
- }
-
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release Pipeline", KRoom.this.roomId);
- }
- });
log.debug("Room {} closed", this.roomId);
}
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 754ee15..930d4ea 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -49,6 +49,7 @@ import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.kurento.client.Continuation;
import org.kurento.client.IceCandidate;
import org.kurento.client.MediaFlowState;
+import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaProfileSpecType;
import org.kurento.client.MediaType;
import org.kurento.client.RecorderEndpoint;
@@ -62,38 +63,49 @@ import com.github.openjson.JSONObject;
public class KStream extends AbstractStream {
private static final Logger log = LoggerFactory.getLogger(KStream.class);
+ private final KurentoHandler kHandler;
private final KRoom room;
private final Date connectedSince;
private final StreamType streamType;
private MediaProfileSpecType profile;
+ private MediaPipeline pipeline;
private RecorderEndpoint recorder;
private WebRtcEndpoint outgoingMedia = null;
private final ConcurrentMap<String, WebRtcEndpoint> listeners = new ConcurrentHashMap<>();
private Optional<CompletableFuture<Object>> flowoutFuture = Optional.empty();
private Long chunkId;
private Type type;
+ private boolean hasAudio;
+ private boolean hasVideo;
+ private boolean hasScreen;
- public KStream(final StreamDesc sd, KRoom room) {
+ public KStream(final StreamDesc sd, KRoom room, KurentoHandler kHandler) {
super(sd.getSid(), sd.getUid());
this.room = room;
streamType = sd.getType();
this.connectedSince = new Date();
+ this.kHandler = kHandler;
//TODO Min/MaxVideoSendBandwidth
//TODO Min/Max Audio/Video RecvBandwidth
}
- public KStream startBroadcast(final StreamProcessor processor, final StreamDesc sd, final String sdpOffer) {
+ public void startBroadcast(
+ final StreamProcessor processor
+ , final StreamDesc sd
+ , final String sdpOffer
+ , Runnable then)
+ {
if (outgoingMedia != null) {
release(processor, false);
}
- final boolean hasAudio = sd.hasActivity(Activity.AUDIO);
- final boolean hasVideo = sd.hasActivity(Activity.VIDEO);
- final boolean hasScreen = sd.hasActivity(Activity.SCREEN);
+ hasAudio = sd.hasActivity(Activity.AUDIO);
+ hasVideo = sd.hasActivity(Activity.VIDEO);
+ hasScreen = sd.hasActivity(Activity.SCREEN);
if ((sdpOffer.indexOf("m=audio") > -1 && !hasAudio)
|| (sdpOffer.indexOf("m=video") > -1 && !hasVideo && StreamType.SCREEN != streamType))
{
log.warn("Broadcast started without enough rights");
- return this;
+ return;
}
if (StreamType.SCREEN == streamType) {
type = Type.SCREEN;
@@ -119,6 +131,25 @@ public class KStream extends AbstractStream {
profile = MediaProfileSpecType.WEBM_VIDEO_ONLY;
break;
}
+ pipeline = kHandler.createPipiline(room.getRoomId(), sd.getUid(), new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ internalStartBroadcast(processor, sd, sdpOffer);
+ then.run();
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("Unable to create pipeline {}", KStream.this.uid, cause);
+ }
+ });
+ }
+
+ private void internalStartBroadcast(
+ final StreamProcessor processor
+ , final StreamDesc sd
+ , final String sdpOffer)
+ {
outgoingMedia = createEndpoint(processor, sd.getSid(), sd.getUid());
outgoingMedia.addMediaSessionTerminatedListener(evt -> log.warn("Media stream terminated {}", sd));
outgoingMedia.addMediaFlowOutStateChangeListener(evt -> {
@@ -151,10 +182,9 @@ public class KStream extends AbstractStream {
if (hasAudio || hasVideo || hasScreen) {
WebSocketHelper.sendRoomOthers(room.getRoomId(), c.getUid(), newKurentoMsg()
.put("id", "newStream")
- .put(PARAM_ICE, processor.getHandler().getTurnServers(c))
+ .put(PARAM_ICE, kHandler.getTurnServers(c))
.put("stream", sd.toJson()));
}
- return this;
}
public void addListener(final StreamProcessor processor, String sid, String uid, String sdpOffer) {
@@ -172,7 +202,7 @@ public class KStream extends AbstractStream {
log.debug("gather candidates");
endpoint.gatherCandidates(); // this one might throw Exception
log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
- processor.getHandler().sendClient(sid, newKurentoMsg()
+ kHandler.sendClient(sid, newKurentoMsg()
.put("id", "videoResponse")
.put("uid", this.uid)
.put("sdpAnswer", sdpAnswer));
@@ -215,11 +245,11 @@ public class KStream extends AbstractStream {
}
private WebRtcEndpoint createEndpoint(final StreamProcessor processor, String sid, String uid) {
- WebRtcEndpoint endpoint = createWebRtcEndpoint(room.getPipeline());
+ WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline);
endpoint.addTag("outUid", this.uid);
endpoint.addTag("uid", uid);
- endpoint.addIceCandidateFoundListener(evt -> processor.getHandler().sendClient(sid
+ endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid
, newKurentoMsg()
.put("id", "iceCandidate")
.put("uid", KStream.this.uid)
@@ -235,7 +265,7 @@ public class KStream extends AbstractStream {
return;
}
final String chunkUid = "rec_" + room.getRecordingId() + "_" + randomUUID();
- recorder = createRecorderEndpoint(room.getPipeline(), getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile);
+ recorder = createRecorderEndpoint(pipeline, getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile);
recorder.addTag("outUid", uid);
recorder.addTag("uid", uid);
@@ -335,6 +365,17 @@ public class KStream extends AbstractStream {
log.warn("PARTICIPANT {}: Could not release", KStream.this.uid, cause);
}
});
+ pipeline.release(new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Released Pipeline", KStream.this.uid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not release Pipeline", KStream.this.uid, cause);
+ }
+ });
releaseRecorder(false);
outgoingMedia = null;
}
@@ -423,6 +464,10 @@ public class KStream extends AbstractStream {
return room;
}
+ MediaPipeline getPipeline() {
+ return pipeline;
+ }
+
public StreamType getStreamType() {
return streamType;
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 5f1214e..a43f62c 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -56,6 +56,7 @@ import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.apache.wicket.util.string.Strings;
+import org.kurento.client.Continuation;
import org.kurento.client.Endpoint;
import org.kurento.client.EventListener;
import org.kurento.client.KurentoClient;
@@ -87,6 +88,7 @@ public class KurentoHandler {
public static final String TAG_KUID = "kuid";
public static final String TAG_MODE = "mode";
public static final String TAG_ROOM = "roomId";
+ public static final String TAG_STREAM_UID = "streamUid";
private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
private final ScheduledExecutorService kmsRecheckScheduler = Executors.newScheduledThreadPool(1);
public static final String KURENTO_TYPE = "kurento";
@@ -296,19 +298,22 @@ public class KurentoHandler {
streamProcessor.remove((Client)c);
}
+ MediaPipeline createPipiline(Long roomId, String uid, Continuation<Void> continuation) {
+ Transaction t = beginTransaction();
+ MediaPipeline pipe = client.createMediaPipeline(t);
+ pipe.addTag(t, TAG_KUID, kuid);
+ pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
+ pipe.addTag(t, TAG_STREAM_UID, uid);
+ t.commit(continuation);
+ return pipe;
+ }
+
KRoom getRoom(Long roomId) {
- log.debug("Searching for room {}", roomId);
KRoom room = rooms.computeIfAbsent(roomId, k -> {
log.debug("Room {} does not exist. Will create now!", roomId);
Room r = roomDao.get(roomId);
- Transaction t = beginTransaction();
- MediaPipeline pipe = client.createMediaPipeline(t);
- pipe.addTag(t, TAG_KUID, kuid);
- pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
- t.commit();
- return new KRoom(this, r, pipe);
+ return new KRoom(this, r);
});
- log.debug("Room {} found!", roomId);
return room;
}
@@ -440,24 +445,30 @@ public class KurentoHandler {
// still alive
MediaPipeline pipe = client.getById(roid, MediaPipeline.class);
Map<String, String> tags = tagsAsMap(pipe);
- final String inKuid = tags.get(TAG_KUID);
- if (ignoredKuids.contains(inKuid)) {
- return;
- }
- if (validTestPipeline(tags)) {
- return;
- }
- if (kuid.equals(inKuid)) {
- KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
- if (r.getPipeline().getId().equals(pipe.getId())) {
+ try {
+ final String inKuid = tags.get(TAG_KUID);
+ if (inKuid != null && ignoredKuids.contains(inKuid)) {
return;
- } else if (r != null) {
- rooms.remove(r.getRoomId());
- r.close();
}
+ if (validTestPipeline(tags)) {
+ return;
+ }
+ if (kuid.equals(inKuid)) {
+ KStream stream = streamProcessor.getByUid(tags.get(TAG_STREAM_UID));
+ if (stream != null) {
+ if (stream.getRoom().getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM)))
+ && stream.getPipeline().getId().equals(pipe.getId()))
+ {
+ return;
+ } else {
+ stream.release(streamProcessor);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
+ pipe.release();
}
- log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
- pipe.release();
}, objCheckTimeout, MILLISECONDS);
} else if (evt.getObject() instanceof Endpoint) {
// endpoint created
@@ -478,22 +489,25 @@ public class KurentoHandler {
}
// still alive
Endpoint point = client.getById(eoid, fClazz);
- Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline());
- final String inKuid = pipeTags.get(TAG_KUID);
- if (ignoredKuids.contains(inKuid)) {
- return;
- }
- if (validTestPipeline(pipeTags)) {
- return;
- }
Map<String, String> tags = tagsAsMap(point);
- KStream stream = streamProcessor.getByUid(tags.get("outUid"));
- log.debug("New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream);
- if (stream != null && stream.contains(tags.get("uid"))) {
- return;
+ try {
+ Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline());
+ final String inKuid = pipeTags.get(TAG_KUID);
+ if (ignoredKuids.contains(inKuid)) {
+ return;
+ }
+ if (validTestPipeline(pipeTags)) {
+ return;
+ }
+ KStream stream = streamProcessor.getByUid(tags.get("outUid"));
+ log.debug("Kurento::ObjectCreated -> New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream);
+ if (stream != null && stream.contains(tags.get("uid"))) {
+ return;
+ }
+ } catch (Throwable e) {
+ log.warn("Kurento::ObjectCreated -> Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags);
+ point.release();
}
- log.warn("Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags);
- point.release();
}, objCheckTimeout, MILLISECONDS);
}
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 27fbe41..5e4635a 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -32,7 +32,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.openmeetings.core.converter.IRecordingConverter;
import org.apache.openmeetings.core.converter.InterviewConverter;
@@ -182,12 +182,13 @@ public class StreamProcessor implements IStreamProcessor {
try {
if (sender == null) {
KRoom room = kHandler.getRoom(c.getRoomId());
- sender = room.join(sd);
- }
- startBroadcast(sender, sd, msg.getString("sdpOffer"));
- if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
- startRecording(c);
+ sender = room.join(sd, kHandler);
}
+ startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> {
+ if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
+ startRecording(c);
+ }
+ });
} catch (KurentoServerException e) {
sender.release(this);
WebSocketHelper.sendClient(c, newStoppedMsg(sd));
@@ -203,10 +204,11 @@ public class StreamProcessor implements IStreamProcessor {
* @param stream Stream to start
* @param sd StreamDesc to start
* @param sdpOffer the sdpOffer
+ * @param then steps need to be done after broadcast is started
* @return the current KStream
*/
- KStream startBroadcast(KStream stream, StreamDesc sd, String sdpOffer) {
- return stream.startBroadcast(this, sd, sdpOffer);
+ void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) {
+ stream.startBroadcast(this, sd, sdpOffer, then);
}
private static boolean isBroadcasting(final Client c) {
@@ -501,7 +503,7 @@ public class StreamProcessor implements IStreamProcessor {
}
}
if (c.getRoomId() != null) {
- getByRoom(c.getRoomId()).stream().forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up
+ getByRoom(c.getRoomId()).forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up
checkStreams(c.getRoomId());
}
}
@@ -514,10 +516,9 @@ public class StreamProcessor implements IStreamProcessor {
return streamByUid.values();
}
- Collection<KStream> getByRoom(Long roomId) {
+ Stream<KStream> getByRoom(Long roomId) {
return streamByUid.values().stream()
- .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId))
- .collect(Collectors.toList());
+ .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId));
}
Client getBySid(String sid) {
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
index 4e0f991..715718a 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
@@ -280,7 +280,7 @@ public class SipManager implements ISipManager, SipListenerExt {
ConfbridgeListAction da = new ConfbridgeListAction(confno);
ResponseEvents r = execEvent(da);
if (r != null) {
- log.debug("SipManager::countUsers size == {}", r.getEvents().size());
+ log.trace("SipManager::countUsers size == {}", r.getEvents().size());
// "- 1" here means: ListComplete event
return r.getEvents().size() - 1;
}
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
index cf24bc3..c5d5b1e 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
@@ -91,7 +91,7 @@ class TestRecordingFlowMocked extends BaseMockedTest {
doReturn(c.getRoom()).when(roomDao).get(ROOM_ID);
// Mock out the methods that do webRTC
- doReturn(null).when(streamProcessor).startBroadcast(any(), any(), any());
+ doReturn(null).when(streamProcessor).startBroadcast(any(), any(), any(), any());
}
@@ -175,7 +175,7 @@ class TestRecordingFlowMocked extends BaseMockedTest {
assertTrue(streamProcessor.isSharing(ROOM_ID));
//verify startBroadcast has been invoked
- verify(streamProcessor).startBroadcast(any(), any(), any());
+ verify(streamProcessor).startBroadcast(any(), any(), any(), any());
// Assert that there is still just 1 stream and has only the activities to Record assigned
assertEquals(1, c.getStreams().size());
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
index 4dfedcc..e4f38f0 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
@@ -313,7 +313,7 @@ public class ClientManager implements IClientManager {
.map(id -> onlineRooms.getOrDefault(id, Set.of()))
.stream()
.flatMap(Set::stream)
- .map(uid -> get(uid))
+ .map(this::get)
.filter(Objects::nonNull);
}
@@ -322,10 +322,8 @@ public class ClientManager implements IClientManager {
.map(id -> onlineRooms.getOrDefault(id, Set.of()))
.stream()
.flatMap(Set::stream)
- .map(uid -> get(uid))
- .filter(c -> c != null && c.sameUserId(userId))
- .findAny()
- .isPresent();
+ .map(this::get)
+ .anyMatch(c -> c != null && c.sameUserId(userId));
}
private List<Client> getByKeys(Long userId, String sessionId) {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
index 799b5a1..2a2800b 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
@@ -82,7 +82,7 @@ public class TimerService {
sipCheckMap.put(
roomId
, new CompletableFuture<>().completeAsync(() -> {
- log.warn("Sip room check {}", roomId);
+ log.trace("Sip room check {}", roomId);
Optional<Client> sipClient = cm.streamByRoom(roomId).filter(Client::isSip).findAny();
cm.streamByRoom(roomId).filter(Predicate.not(Client::isSip)).findAny().ifPresentOrElse(c -> {
updateSipLastName(sipClient, c.getRoom());
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 4a4ae17..e0f9780 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -619,8 +619,7 @@ public class RoomPanel extends BasePanel {
handler.appendJavaScript("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(true);}");
} else if (streamProcessor.recordingAllowed(getClient())) {
boolean hasStreams = cm.streamByRoom(r.getId())
- .filter(cl -> !cl.getStreams().isEmpty())
- .findAny().isPresent();
+ .anyMatch(cl -> !cl.getStreams().isEmpty());
handler.appendJavaScript(String.format("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(false);WbArea.setRecEnabled(%s);}", hasStreams));
}
}
@@ -636,8 +635,7 @@ public class RoomPanel extends BasePanel {
public static boolean hasRight(ClientManager cm, long userId, long roomId, Right r) {
return cm.streamByRoom(roomId)
- .filter(c -> c.sameUserId(userId) && c.hasRight(r))
- .findAny().isPresent();
+ .anyMatch(c -> c.sameUserId(userId) && c.hasRight(r));
}
@Override
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
index db083a4..215b114 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
@@ -6,6 +6,10 @@ var Video = (function() {
, lm, level, userSpeaks = false, muteOthers
, hasVideo, isSharing, isRecording;
+ function __getVideo(_state) {
+ const vid = self.video(_state);
+ return vid && vid.length > 0 ? vid[0] : null;
+ }
function _resizeDlgArea(_w, _h) {
if (Room.getOptions().interview) {
VideoUtil.setPos(v, VideoUtil.getPos());
@@ -153,7 +157,7 @@ var Video = (function() {
, onicecandidate: self.onIceCandidate
};
if (!isSharing) {
- state.options.localVideo = state.video[0];
+ state.options.localVideo = __getVideo(state);
}
const data = state.data;
data.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(
@@ -203,7 +207,7 @@ var Video = (function() {
function _createResvPeer(msg, state) {
__createVideo(state);
const options = VideoUtil.addIceServers({
- remoteVideo : state.video[0]
+ remoteVideo : __getVideo(state)
, onicecandidate : self.onIceCandidate
}, msg);
const data = state.data;
@@ -418,8 +422,9 @@ var Video = (function() {
}
state.video = $(hasVideo ? '<video>' : '<audio>').attr('id', 'vid' + _id)
.attr('playsinline', 'playsinline')
- .width(vc.width()).height(vc.height())
- .prop('autoplay', true).prop('controls', false);
+ //.attr('autoplay', 'autoplay')
+ .prop('controls', false)
+ .width(vc.width()).height(vc.height());
if (state.data) {
state.video.data(state.data);
}
@@ -568,20 +573,22 @@ var Video = (function() {
return;
}
state.data.rtcPeer.processAnswer(answer, function (error) {
- if (true === this.cleaned || this.peerConnection.signalingState === 'stable') {
+ if (true === this.cleaned) {
return;
}
- if (error) {
- return OmUtil.error(error);
- }
- if (state.video && state.video.paused) {
- state.video.play().catch(function (err) {
+ const video = __getVideo(state);
+ if (this.peerConnection.signalingState === 'stable' && video && video.paused) {
+ video.play().catch(function (err) {
if ('NotAllowedError' === err.name) {
VideoUtil.askPermission(function () {
- state.video.play();
+ video.play();
});
}
});
+ return;
+ }
+ if (error) {
+ return OmUtil.error(error);
}
});
}
@@ -625,8 +632,8 @@ var Video = (function() {
});
};
self.reattachStream = _reattachStream;
- self.video = function() {
- const state = states.length > 0 ? states[0] : null;
+ self.video = function(_state) {
+ const state = _state || (states.length > 0 ? states[0] : null);
if (!state || state.disposed) {
return null;
}
diff --git a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
index 415d78c..985d45b 100644
--- a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
+++ b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
@@ -33,7 +33,7 @@ import org.apache.openmeetings.AbstractJUnitDefaults;
import org.apache.openmeetings.db.dao.label.LabelDao;
import org.junit.jupiter.api.Test;
-public class TestApplication extends AbstractJUnitDefaults {
+class TestApplication extends AbstractJUnitDefaults {
@Test
void testMissing() {
assertEquals("[Missing]", app.getOmString("909", Locale.ENGLISH));