You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2019/03/03 10:56:21 UTC
[openmeetings] branch master updated: [OPENMEETINGS-1955] major
work on room recording seems to be done
This is an automated email from the ASF dual-hosted git repository.
solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push:
new 1ac19d4 [OPENMEETINGS-1955] major work on room recording seems to be done
1ac19d4 is described below
commit 1ac19d404d781825f653f35bd185e3e6cdb89716
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Sun Mar 3 17:56:08 2019 +0700
[OPENMEETINGS-1955] major work on room recording seems to be done
---
.../remote/{IKStream.java => AbstractStream.java} | 26 +-
.../core/remote/IStreamProcessor.java} | 41 +-
.../org/apache/openmeetings/core/remote/KRoom.java | 36 +-
.../apache/openmeetings/core/remote/KStream.java | 95 +++--
.../openmeetings/core/remote/KTestStream.java | 23 +-
.../openmeetings/core/remote/KurentoHandler.java | 433 ++------------------
.../openmeetings/core/remote/StreamProcessor.java | 442 +++++++++++++++++++++
.../core/remote/TestStreamProcessor.java | 126 ++++++
.../openmeetings/core/remote/BaseMockedTest.java | 11 +-
.../core/remote/TestNotConnectedMocked.java | 8 +-
.../core/remote/TestRoomFlowMocked.java | 10 +-
.../apache/openmeetings/web/room/RoomPanel.java | 9 +-
.../openmeetings/web/room/menu/RoomMenuPanel.java | 5 +-
.../org/apache/openmeetings/web/room/raw-sharer.js | 20 +-
.../openmeetings/web/room/raw-video-manager.js | 6 +-
.../openmeetings/web/room/sidebar/RoomSidebar.java | 6 +-
.../openmeetings/web/room/wb/InterviewWbPanel.java | 10 +-
openmeetings-web/src/test/resources/keystore | Bin 1821 -> 4365 bytes
18 files changed, 753 insertions(+), 554 deletions(-)
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java
similarity index 68%
rename from openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKStream.java
rename to openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java
index c6979f0..462711b 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java
@@ -24,20 +24,36 @@ import org.kurento.client.PlayerEndpoint;
import org.kurento.client.RecorderEndpoint;
import org.kurento.client.WebRtcEndpoint;
-public interface IKStream {
- void release(KurentoHandler h);
+public abstract class AbstractStream {
+ protected final String sid;
+ protected final String uid;
- default WebRtcEndpoint createWebRtcEndpoint(MediaPipeline pipeline) {
+ public AbstractStream(final String sid, final String uid) {
+ this.sid = sid;
+ this.uid = uid;
+ }
+
+ public String getSid() {
+ return sid;
+ }
+
+ public String getUid() {
+ return uid;
+ }
+
+ public abstract void release(IStreamProcessor processor);
+
+ public WebRtcEndpoint createWebRtcEndpoint(MediaPipeline pipeline) {
return new WebRtcEndpoint.Builder(pipeline).build();
}
- default RecorderEndpoint createRecorderEndpoint(MediaPipeline pipeline, String path, MediaProfileSpecType profile) {
+ public RecorderEndpoint createRecorderEndpoint(MediaPipeline pipeline, String path, MediaProfileSpecType profile) {
return new RecorderEndpoint.Builder(pipeline, path)
.stopOnEndOfStream()
.withMediaProfile(profile).build();
}
- default PlayerEndpoint createPlayerEndpoint(MediaPipeline pipeline, String path) {
+ public PlayerEndpoint createPlayerEndpoint(MediaPipeline pipeline, String path) {
return new PlayerEndpoint.Builder(pipeline, path).build();
}
}
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
similarity index 52%
copy from openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
copy to openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
index 1877518..d6575ed 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
@@ -1,5 +1,4 @@
/*
-
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,41 +18,7 @@
*/
package org.apache.openmeetings.core.remote;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import org.junit.Test;
-
-import com.github.openjson.JSONObject;
-
-public class TestNotConnectedMocked extends BaseMockedTest {
- @Test
- public void testNotConnected() {
- handler.onMessage(null, MSG_BASE);
- }
-
- @Test
- public void testRecordingAllowed() {
- assertFalse(handler.recordingAllowed(null));
- }
-
- @Test
- public void testStartRecording() {
- handler.startRecording(null);
- }
-
- @Test
- public void testStopRecording() {
- handler.stopRecording(null);
- }
-
- @Test
- public void testIsRecording() {
- assertFalse(handler.isRecording(null));
- }
-
- @Test
- public void testGetRecordingUser() {
- assertEquals(new JSONObject().toString(), handler.getRecordingUser(null).toString());
- }
+public interface IStreamProcessor {
+ void release(AbstractStream stream);
+ void destroy();
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 8f5f7dd..1a4925d 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -106,9 +106,9 @@ public class KRoom {
return streams.values();
}
- public void onStopBroadcast(KStream stream, final KurentoHandler h) {
+ public void onStopBroadcast(KStream stream, final StreamProcessor processor) {
streams.remove(stream.getUid());
- stream.release(h);
+ stream.release(processor);
WebSocketHelper.sendAll(newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", stream.getUid())
@@ -118,7 +118,7 @@ public class KRoom {
//FIXME TODO permission can be removed, some listener might be required
}
- public void leave(final KurentoHandler h, final Client c) {
+ public void leave(final StreamProcessor processor, final Client c) {
for (Map.Entry<String, KStream> e : streams.entrySet()) {
e.getValue().remove(c);
}
@@ -128,7 +128,7 @@ public class KRoom {
}
KStream stream = streams.remove(sd.getUid());
if (stream != null) {
- stream.release(h);
+ stream.release(processor);
}
}
}
@@ -190,21 +190,32 @@ public class KRoom {
}
}
- public void stopRecording(KurentoHandler h, Client c, RecordingDao recDao) {
+ public void stopRecording(final StreamProcessor processor, Client c) {
if (recordingStarted.compareAndSet(true, false)) {
log.debug("##REC:: recording in room {} is stopping {} ::", roomId, recordingId);
for (final KStream stream : streams.values()) {
stream.stopRecord();
}
- Recording rec = recDao.get(recordingId);
+ Recording rec = processor.getRecordingDao().get(recordingId);
rec.setRecordEnd(new Date());
- rec = recDao.update(rec);
+ rec = processor.getRecordingDao().update(rec);
recordingUser = new JSONObject();
recordingId = null;
- h.startConvertion(rec);
+ processor.startConvertion(rec);
+ User u;
+ if (c == null) {
+ u = new User();
+ } else {
+ u = c.getUser();
+ Optional<StreamDesc> osd = c.getScreenStream();
+ if (osd.isPresent()) {
+ osd.get().removeActivity(Activity.RECORD);
+ processor.getClientManager().update(c);
+ processor.getHandler().sendShareUpdated(osd.get());
+ }
+ }
// Send notification to all users that the recording has been started
- User u = c == null ? new User() : c.getUser();
WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.recordingToggled));
log.debug("##REC:: recording in room {} is stopped ::", roomId);
}
@@ -218,8 +229,9 @@ public class KRoom {
return new JSONObject(sharingUser.toString());
}
- public void startSharing(KurentoHandler h, IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
+ public void startSharing(StreamProcessor processor, IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
StreamDesc sd = null;
+ KurentoHandler h = processor.getHandler();
if (sharingStarted.compareAndSet(false, true)) {
sharingUser.put("sid", c.getSid());
sd = c.addStream(StreamType.SCREEN, a);
@@ -246,9 +258,9 @@ public class KRoom {
}
}
- public void close(final KurentoHandler h) {
+ public void close(final StreamProcessor processor) {
for (final KStream stream : streams.values()) {
- stream.release(h);
+ stream.release(processor);
}
streams.clear();
pipeline.release(new Continuation<Void>() {
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index dbbd196..de77ad7 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -53,11 +53,9 @@ import org.slf4j.LoggerFactory;
import com.github.openjson.JSONObject;
-public class KStream implements IKStream {
+public class KStream extends AbstractStream {
private static final Logger log = LoggerFactory.getLogger(KStream.class);
- private final String sid;
- private final String uid;
private final KRoom room;
private final StreamType streamType;
private MediaProfileSpecType profile;
@@ -68,17 +66,16 @@ public class KStream implements IKStream {
private Type type;
public KStream(final StreamDesc sd, KRoom room) {
+ super(sd.getSid(), sd.getUid());
this.room = room;
- this.sid = sd.getSid();
- this.uid = sd.getUid();
streamType = sd.getType();
//TODO Min/MaxVideoSendBandwidth
//TODO Min/Max Audio/Video RecvBandwidth
}
- public KStream startBroadcast(final KurentoHandler h, final StreamDesc sd, final String sdpOffer) {
+ public KStream startBroadcast(final StreamProcessor processor, final StreamDesc sd, final String sdpOffer) {
if (outgoingMedia != null) {
- release(h);
+ release(processor);
}
final boolean hasAudio = sd.hasActivity(Activity.AUDIO);
final boolean hasVideo = sd.hasActivity(Activity.VIDEO);
@@ -112,7 +109,7 @@ public class KStream implements IKStream {
profile = MediaProfileSpecType.WEBM_VIDEO_ONLY;
break;
}
- outgoingMedia = createEndpoint(h, sd.getSid(), sd.getUid());
+ outgoingMedia = createEndpoint(processor, sd.getSid(), sd.getUid());
outgoingMedia.addMediaSessionTerminatedListener(evt -> {
log.warn("Media stream terminated");
});
@@ -120,16 +117,16 @@ public class KStream implements IKStream {
if (MediaFlowState.NOT_FLOWING == evt.getState()) {
log.warn("Media FlowOut :: {}", evt.getState());
if (StreamType.SCREEN == streamType) {
- h.stopSharing(sid, uid);
+ processor.doStopSharing(sid, uid);
}
- stopBroadcast(h);
+ stopBroadcast(processor);
}
});
outgoingMedia.addMediaFlowInStateChangeListener(evt -> {
log.warn("Media FlowIn :: {}", evt);
});
- h.streamsByUid.put(uid, this);
- addListener(h, sd.getSid(), sd.getUid(), sdpOffer);
+ processor.addStream(this);
+ addListener(processor, sd.getSid(), sd.getUid(), sdpOffer);
if (room.isRecording()) {
startRecord();
}
@@ -137,12 +134,12 @@ public class KStream implements IKStream {
WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
WebSocketHelper.sendRoomOthers(room.getRoomId(), c.getUid(), newKurentoMsg()
.put("id", "newStream")
- .put(PARAM_ICE, h.getTurnServers())
+ .put(PARAM_ICE, processor.getHandler().getTurnServers())
.put("stream", sd.toJson()));
return this;
}
- public void addListener(final KurentoHandler h, String sid, String uid, String sdpOffer) {
+ public void addListener(final StreamProcessor processor, String sid, String uid, String sdpOffer) {
final boolean self = uid.equals(this.uid);
log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", room.getRoomId());
log.trace("USER {}: SdpOffer is {}", uid, sdpOffer);
@@ -151,11 +148,11 @@ public class KStream implements IKStream {
return;
}
- final WebRtcEndpoint endpoint = getEndpointForUser(h, sid, uid);
+ final WebRtcEndpoint endpoint = getEndpointForUser(processor, sid, uid);
final String sdpAnswer = endpoint.processOffer(sdpOffer);
log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
- h.sendClient(sid, newKurentoMsg()
+ processor.getHandler().sendClient(sid, newKurentoMsg()
.put("id", "videoResponse")
.put("uid", this.uid)
.put("sdpAnswer", sdpAnswer));
@@ -163,7 +160,7 @@ public class KStream implements IKStream {
endpoint.gatherCandidates();
}
- private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, String sid, String uid) {
+ private WebRtcEndpoint getEndpointForUser(final StreamProcessor processor, String sid, String uid) {
if (uid.equals(this.uid)) {
log.debug("PARTICIPANT {}: configuring loopback", this.uid);
return outgoingMedia;
@@ -176,11 +173,11 @@ public class KStream implements IKStream {
listener.release();
}
log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, this.uid);
- listener = createEndpoint(h, sid, uid);
+ listener = createEndpoint(processor, sid, uid);
listeners.put(uid, listener);
log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, this.uid);
- Client cur = h.getBySid(this.sid);
+ Client cur = processor.getBySid(this.sid);
if (cur == null) {
log.warn("Client for endpoint dooesn't exists");
} else {
@@ -199,12 +196,12 @@ public class KStream implements IKStream {
return listener;
}
- private WebRtcEndpoint createEndpoint(final KurentoHandler h, String sid, String uid) {
+ private WebRtcEndpoint createEndpoint(final StreamProcessor processor, String sid, String uid) {
WebRtcEndpoint endpoint = createWebRtcEndpoint(room.getPipeline());
endpoint.addTag("outUid", this.uid);
endpoint.addTag("uid", uid);
- endpoint.addIceCandidateFoundListener(evt -> h.sendClient(sid, newKurentoMsg()
+ endpoint.addIceCandidateFoundListener(evt -> processor.getHandler().sendClient(sid, newKurentoMsg()
.put("id", "iceCandidate")
.put("uid", KStream.this.uid)
.put(PARAM_CANDIDATE, convert(JsonUtils.toJsonObject(evt.getCandidate()))))
@@ -258,37 +255,45 @@ public class KStream implements IKStream {
}
}
- public void stopBroadcast(final KurentoHandler h) {
- room.onStopBroadcast(this, h);
+ public void stopBroadcast(final StreamProcessor processor) {
+ room.onStopBroadcast(this, processor);
+ }
+
+ public void pauseSharing() {
+ releaseListeners();
+ }
+
+ private void releaseListeners() {
+ log.debug("PARTICIPANT {}: Releasing listeners", uid);
+ for (Entry<String, WebRtcEndpoint> entry : listeners.entrySet()) {
+ final String inUid = entry.getKey();
+ log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
+
+ final WebRtcEndpoint ep = entry.getValue();
+ ep.release(new Continuation<Void>() {
+ @Override
+ public void onSuccess(Void result) throws Exception {
+ log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
+ }
+
+ @Override
+ public void onError(Throwable cause) throws Exception {
+ log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
+ }
+ });
+ }
+ listeners.clear();
}
@Override
- public void release(KurentoHandler h) {
+ public void release(IStreamProcessor processor) {
if (outgoingMedia != null) {
- log.debug("PARTICIPANT {}: Releasing resources", uid);
- for (Entry<String, WebRtcEndpoint> entry : listeners.entrySet()) {
- final String inUid = entry.getKey();
- log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
-
- final WebRtcEndpoint ep = entry.getValue();
- ep.release(new Continuation<Void>() {
- @Override
- public void onSuccess(Void result) throws Exception {
- log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
- }
-
- @Override
- public void onError(Throwable cause) throws Exception {
- log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
- }
- });
- }
- listeners.clear();
+ releaseListeners();
outgoingMedia.release();
outgoingMedia = null;
}
releaseRecorder();
- h.streamsByUid.remove(uid);
+ processor.release(this);
}
private void releaseRecorder() {
@@ -315,10 +320,12 @@ public class KStream implements IKStream {
return new JSONObject(o.toString());
}
+ @Override
public String getSid() {
return sid;
}
+ @Override
public String getUid() {
return uid;
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index 85e1a9c..b3395fd 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -20,8 +20,8 @@ package org.apache.openmeetings.core.remote;
import static java.util.UUID.randomUUID;
import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
-import static org.apache.openmeetings.core.remote.KurentoHandler.newTestKurentoMsg;
import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
+import static org.apache.openmeetings.core.remote.TestStreamProcessor.newTestKurentoMsg;
import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_WEBM;
import static org.apache.openmeetings.util.OmFileHelper.TEST_SETUP_PREFIX;
import static org.apache.openmeetings.util.OmFileHelper.getStreamsDir;
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
import com.github.openjson.JSONObject;
-public class KTestStream implements IKStream {
+public class KTestStream extends AbstractStream {
private static final Logger log = LoggerFactory.getLogger(KTestStream.class);
private MediaPipeline pipeline;
private WebRtcEndpoint webRtcEndpoint;
@@ -56,13 +56,12 @@ public class KTestStream implements IKStream {
private RecorderEndpoint recorder;
private String recPath = null;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final String uid;
private ScheduledFuture<?> recHandle;
private int recTime;
- public KTestStream(IWsClient _c, JSONObject msg, MediaPipeline pipeline) {
+ public KTestStream(IWsClient c, JSONObject msg, MediaPipeline pipeline) {
+ super(null, c.getUid());
this.pipeline = pipeline;
- this.uid = _c.getUid();
webRtcEndpoint = createWebRtcEndpoint(pipeline);
webRtcEndpoint.connect(webRtcEndpoint);
@@ -73,7 +72,7 @@ public class KTestStream implements IKStream {
recorder.addRecordingListener(evt -> {
recTime = 0;
recHandle = scheduler.scheduleAtFixedRate(
- () -> WebSocketHelper.sendClient(_c, newTestKurentoMsg().put("id", "recording").put("time", recTime++))
+ () -> WebSocketHelper.sendClient(c, newTestKurentoMsg().put("id", "recording").put("time", recTime++))
, 0, 1, TimeUnit.SECONDS);
scheduler.schedule(() -> {
recorder.stop();
@@ -81,7 +80,7 @@ public class KTestStream implements IKStream {
}, 5, TimeUnit.SECONDS);
});
recorder.addStoppedListener(evt -> {
- WebSocketHelper.sendClient(_c, newTestKurentoMsg().put("id", "recStopped"));
+ WebSocketHelper.sendClient(c, newTestKurentoMsg().put("id", "recStopped"));
releaseRecorder();
});
switch (profile) {
@@ -103,9 +102,9 @@ public class KTestStream implements IKStream {
String sdpOffer = msg.getString("sdpOffer");
String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);
- addIceListener(_c);
+ addIceListener(c);
- WebSocketHelper.sendClient(_c, newTestKurentoMsg()
+ WebSocketHelper.sendClient(c, newTestKurentoMsg()
.put("id", "startResponse")
.put("sdpAnswer", sdpAnswer));
webRtcEndpoint.gatherCandidates();
@@ -117,7 +116,7 @@ public class KTestStream implements IKStream {
@Override
public void onError(Throwable cause) throws Exception {
- sendError(_c, "Failed to start recording");
+ sendError(c, "Failed to start recording");
log.error("Failed to start recording", cause);
}
});
@@ -217,13 +216,13 @@ public class KTestStream implements IKStream {
}
@Override
- public void release(KurentoHandler h) {
+ public void release(IStreamProcessor processor) {
if (webRtcEndpoint != null) {
webRtcEndpoint.release();
webRtcEndpoint = null;
}
releasePlayer();
releaseRecorder();
- h.testsByUid.remove(uid);
+ processor.release(this);
}
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index afbf733..a940255 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -26,42 +26,31 @@ import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.directory.api.util.Strings;
-import org.apache.openmeetings.core.converter.IRecordingConverter;
-import org.apache.openmeetings.core.converter.InterviewConverter;
-import org.apache.openmeetings.core.converter.RecordingConverter;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
-import org.apache.openmeetings.db.dao.record.RecordingDao;
import org.apache.openmeetings.db.dao.room.RoomDao;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.basic.Client.Activity;
import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
-import org.apache.openmeetings.db.entity.basic.Client.StreamType;
import org.apache.openmeetings.db.entity.basic.IWsClient;
-import org.apache.openmeetings.db.entity.record.Recording;
import org.apache.openmeetings.db.entity.room.Room;
import org.apache.openmeetings.db.entity.room.Room.Right;
-import org.apache.openmeetings.db.entity.room.Room.RoomElement;
import org.apache.openmeetings.db.entity.user.User;
import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.kurento.client.Endpoint;
import org.kurento.client.EventListener;
-import org.kurento.client.IceCandidate;
import org.kurento.client.KurentoClient;
import org.kurento.client.KurentoConnectionListener;
import org.kurento.client.MediaObject;
@@ -75,7 +64,6 @@ import org.kurento.client.WebRtcEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.task.TaskExecutor;
import com.github.openjson.JSONArray;
import com.github.openjson.JSONObject;
@@ -86,9 +74,9 @@ public class KurentoHandler {
public static final String PARAM_CANDIDATE = "candidate";
private static final String WARN_NO_KURENTO = "Media Server is not accessible";
public static final String MODE_TEST = "test";
- private static final String TAG_KUID = "kuid";
+ public static final String TAG_KUID = "kuid";
public static final String TAG_MODE = "mode";
- private static final String TAG_ROOM = "roomId";
+ public static final String TAG_ROOM = "roomId";
private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
private final ScheduledExecutorService recheckScheduler = Executors.newScheduledThreadPool(1);
public static final String KURENTO_TYPE = "kurento";
@@ -105,8 +93,6 @@ public class KurentoHandler {
private boolean connected = false;
private String kuid;
private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
- final Map<String, KStream> streamsByUid = new ConcurrentHashMap<>();
- final Map<String, KTestStream> testsByUid = new ConcurrentHashMap<>();
private Runnable check;
@Autowired
@@ -114,15 +100,11 @@ public class KurentoHandler {
@Autowired
private RoomDao roomDao;
@Autowired
- private RecordingDao recDao;
- @Autowired
private RecordingChunkDao chunkDao;
@Autowired
- private TaskExecutor taskExecutor;
- @Autowired
- private RecordingConverter recordingConverter;
+ private TestStreamProcessor testProcessor;
@Autowired
- private InterviewConverter interviewConverter;
+ private StreamProcessor streamProcessor;
boolean isConnected() {
boolean connctd = client != null && !client.isClosed() && connected;
@@ -151,14 +133,11 @@ public class KurentoHandler {
kuid = randomUUID().toString(); // will be changed to prevent double events
client.destroy();
for (Entry<Long, KRoom> e : rooms.entrySet()) {
- e.getValue().close(this);
+ e.getValue().close(streamProcessor);
}
+ testProcessor.destroy();
+ streamProcessor.destroy();
rooms.clear();
- for (Entry<String, KTestStream> e : testsByUid.entrySet()) {
- e.getValue().release(this);
- }
- testsByUid.clear();
- streamsByUid.clear();
client = null;
}
}
@@ -175,136 +154,6 @@ public class KurentoHandler {
return client.beginTransaction();
}
- private MediaPipeline createTestPipeline() {
- Transaction t = beginTransaction();
- MediaPipeline pipe = client.createMediaPipeline(t);
- pipe.addTag(t, TAG_KUID, kuid);
- pipe.addTag(t, TAG_MODE, MODE_TEST);
- pipe.addTag(t, TAG_ROOM, MODE_TEST);
- t.commit();
- return pipe;
- }
-
- private void onTestMessage(IWsClient _c, final String cmdId, JSONObject msg) {
- KTestStream user = getTestByUid(_c.getUid());
- switch (cmdId) {
- case "wannaRecord":
- WebSocketHelper.sendClient(_c, newTestKurentoMsg()
- .put("id", "canRecord")
- .put(PARAM_ICE, getTurnServers(true))
- );
- break;
- case "record":
- if (user != null) {
- user.release(this);
- }
- user = new KTestStream(_c, msg, createTestPipeline());
- testsByUid.put(_c.getUid(), user);
- break;
- case "iceCandidate":
- JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
- if (user != null) {
- IceCandidate cand = new IceCandidate(candidate.getString(PARAM_CANDIDATE),
- candidate.getString("sdpMid"), candidate.getInt("sdpMLineIndex"));
- user.addCandidate(cand);
- }
- break;
- case "wannaPlay":
- WebSocketHelper.sendClient(_c, newTestKurentoMsg()
- .put("id", "canPlay")
- .put(PARAM_ICE, getTurnServers(true))
- );
- break;
- case "play":
- if (user != null) {
- user.play(_c, msg, createTestPipeline());
- }
- break;
- }
- }
-
- private void onMessage(Client c, final String cmdId, JSONObject msg) {
- final String uid = msg.optString("uid");
- KStream sender;
- StreamDesc sd;
- Optional<StreamDesc> osd;
- log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
- switch (cmdId) {
- case "devicesAltered":
- if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) {
- c.remove(Activity.AUDIO);
- }
- if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) {
- c.remove(Activity.VIDEO);
- }
- c.getStream(uid).setActivities();
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.rightUpdated, c.getUid()));
- break;
- case "toggleActivity":
- toggleActivity(c, Activity.valueOf(msg.getString("activity")));
- break;
- case "broadcastStarted":
- sd = c.getStream(uid);
- sender = getByUid(uid);
- if (sender == null) {
- KRoom room = getRoom(c.getRoomId());
- sender = room.join(sd);
- }
- sender.startBroadcast(this, sd, msg.getString("sdpOffer"));
- if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
- startRecording(c);
- }
- break;
- case "onIceCandidate":
- sender = getByUid(uid);
- if (sender != null) {
- JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
- IceCandidate cand = new IceCandidate(
- candidate.getString(PARAM_CANDIDATE)
- , candidate.getString("sdpMid")
- , candidate.getInt("sdpMLineIndex"));
- sender.addCandidate(cand, msg.getString("luid"));
- }
- break;
- case "addListener":
- sender = getByUid(msg.getString("sender"));
- if (sender != null) {
- sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer"));
- }
- break;
- case "wannaShare":
- osd = c.getScreenStream();
- if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) {
- startSharing(c, osd, msg, Activity.SCREEN);
- }
- break;
- case "wannaRecord":
- osd = c.getScreenStream();
- if (recordingAllowed(c)) {
- Room r = c.getRoom();
- if (Room.Type.interview == r.getType()) {
- log.warn("This shouldn't be called for interview room");
- break;
- }
- boolean sharing = isSharing(r.getId());
- startSharing(c, osd, msg, Activity.RECORD);
- if (sharing) {
- startRecording(c);
- }
- }
- break;
- case "stopSharing":
- stopSharing(c, uid);
- break;
- case "stopRecord":
- stopRecording(c);
- break;
- case "errorSharing":
- errorSharing(c);
- break;
- }
- }
-
public void onMessage(IWsClient _c, JSONObject msg) {
if (!isConnected()) {
sendError(_c, "Multimedia server is inaccessible");
@@ -312,7 +161,7 @@ public class KurentoHandler {
}
final String cmdId = msg.getString("id");
if (MODE_TEST.equals(msg.optString(TAG_MODE))) {
- onTestMessage(_c, cmdId, msg);
+ testProcessor.onMessage(_c, cmdId, msg);
} else {
final Client c = (Client)_c;
@@ -320,146 +169,8 @@ public class KurentoHandler {
log.warn("Incoming message from invalid user");
return;
}
- onMessage(c, cmdId, msg);
- }
- }
-
- private static boolean isBroadcasting(final Client c) {
- return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO);
- }
-
- private void checkStreams(Long roomId) {
- if (!isConnected()) {
- return;
- }
- KRoom room = getRoom(roomId);
- if (room.isSharing()) {
- List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
- .flatMap(c -> c.getStreams().stream())
- .filter(sd -> StreamType.SCREEN == sd.getType()).collect(Collectors.toList());
- if (streams.isEmpty()) {
- log.info("No more screen streams in the room, stopping sharing");
- room.stopSharing();
- if (Room.Type.interview != room.getType() && room.isRecording()) {
- log.info("No more screen streams in the non-interview room, stopping recording");
- room.stopRecording(this, null, recDao);
- }
- }
- }
- if (room.isRecording()) {
- List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
- .flatMap(c -> c.getStreams().stream())
- .collect(Collectors.toList());
- if (streams.isEmpty()) {
- log.info("No more streams in the room, stopping recording");
- room.stopRecording(this, null, recDao);
- }
- }
- }
-
- public void toggleActivity(Client c, Activity a) {
- log.info("PARTICIPANT {}: trying to toggle activity {}", c, c.getRoomId());
-
- if (!activityAllowed(c, a, c.getRoom())) {
- if (a == Activity.AUDIO || a == Activity.AUDIO_VIDEO) {
- c.allow(Room.Right.audio);
- }
- if (!c.getRoom().isAudioOnly() && (a == Activity.VIDEO || a == Activity.AUDIO_VIDEO)) {
- c.allow(Room.Right.video);
- }
- }
- if (activityAllowed(c, a, c.getRoom())) {
- boolean wasBroadcasting = isBroadcasting(c);
- if (a == Activity.AUDIO && !c.isMicEnabled()) {
- return;
- }
- if (a == Activity.VIDEO && !c.isCamEnabled()) {
- return;
- }
- if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) {
- return;
- }
- c.toggle(a);
- if (!isBroadcasting(c)) {
- //close
- boolean changed = false;
- for (StreamDesc sd : c.getStreams()) {
- KStream s = getByUid(sd.getUid());
- if (StreamType.WEBCAM == sd.getType()) {
- if (s != null) {
- s.stopBroadcast(this);
- }
- c.removeStream(sd.getUid());
- changed = true;
- }
- }
- if (changed) {
- cm.update(c);
- checkStreams(c.getRoomId());
- }
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
- //FIXME TODO update interview buttons
- } else if (!wasBroadcasting) {
- //join
- StreamDesc sd = c.addStream(StreamType.WEBCAM);
- cm.update(c);
- log.debug("User {}: has started broadcast", sd.getUid());
- sendClient(sd.getSid(), newKurentoMsg()
- .put("id", "broadcast")
- .put("stream", sd.toJson())
- .put(PARAM_ICE, getTurnServers(false)));
- //FIXME TODO update interview buttons
- } else {
- //constraints were changed
- for (StreamDesc sd : c.getStreams()) {
- if (StreamType.WEBCAM == sd.getType()) {
- sd.setActivities();
- cm.update(c);
- break;
- }
- }
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
- }
- }
- }
-
- public boolean hasRightsToRecord(Client c) {
- Room r = c.getRoom();
- return r != null && r.isAllowRecording() && c.hasRight(Right.moderator);
- }
-
- public boolean recordingAllowed(Client c) {
- if (!isConnected()) {
- return false;
- }
- Room r = c.getRoom();
- return hasRightsToRecord(c) && !isRecording(r.getId());
- }
-
- public void startRecording(Client c) {
- if (!isConnected() || !hasRightsToRecord(c)) {
- return;
- }
- getRoom(c.getRoomId()).startRecording(cm, c, recDao);
- }
-
- public void stopRecording(Client c) {
- if (!isConnected() || !hasRightsToRecord(c)) {
- return;
+ streamProcessor.onMessage(c, cmdId, msg);
}
- getRoom(c.getRoomId()).stopRecording(this, c, recDao);
- }
-
- void startConvertion(Recording rec) {
- IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter;
- taskExecutor.execute(() -> conv.startConversion(rec));
- }
-
- public boolean isRecording(Long roomId) {
- if (!isConnected()) {
- return false;
- }
- return getRoom(roomId).isRecording();
}
public JSONObject getRecordingUser(Long roomId) {
@@ -469,74 +180,6 @@ public class KurentoHandler {
return getRoom(roomId).getRecordingUser();
}
- public boolean hasRightsToShare(Client c) {
- Room r = c.getRoom();
- return r != null && Room.Type.interview != r.getType()
- && !r.isHidden(RoomElement.ScreenSharing)
- && r.isAllowRecording() && c.hasRight(Right.share);
- }
-
- public boolean screenShareAllowed(Client c) {
- if (!isConnected()) {
- return false;
- }
- Room r = c.getRoom();
- return hasRightsToShare(c) && !isSharing(r.getId());
- }
-
- private void errorSharing(Client c) {
- if (!isConnected()) {
- return;
- }
- KRoom room = getRoom(c.getRoomId());
- if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) {
- return;
- }
- Optional<StreamDesc> osd = c.getScreenStream();
- if (osd.isPresent()) {
- stopSharing(c, osd.get().getUid());
- } else {
- room.stopSharing();
- }
- stopRecording(c);
- }
-
- private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
- if (isConnected() && c.getRoomId() != null) {
- getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a);
- }
- }
-
- private void stopSharing(Client c, String uid) {
- KStream sender = getByUid(uid);
- StreamDesc sd = stopSharing(c.getSid(), uid);
- if (sender != null && sd != null) {
- sender.stopBroadcast(this);
- }
- }
-
- StreamDesc stopSharing(String sid, String uid) {
- StreamDesc sd = null;
- Client c = getBySid(sid);
- if (c.getRoomId() != null) {
- sd = c.getStream(uid);
- if (sd != null && StreamType.SCREEN == sd.getType()) {
- c.removeStream(uid);
- cm.update(c);
- checkStreams(c.getRoomId());
- WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
- }
- }
- return sd;
- }
-
- public boolean isSharing(Long roomId) {
- if (!isConnected()) {
- return false;
- }
- return getRoom(roomId).isSharing();
- }
-
public void leaveRoom(Client c) {
remove(c);
WebSocketHelper.sendAll(newKurentoMsg()
@@ -546,10 +189,6 @@ public class KurentoHandler {
);
}
- Client getBySid(String sid) {
- return cm.getBySid(sid);
- }
-
void sendShareUpdated(StreamDesc sd) {
sendClient(sd.getSid(), newKurentoMsg()
.put("id", "shareUpdated")
@@ -567,34 +206,18 @@ public class KurentoHandler {
.put("message", msg));
}
- public void remove(IWsClient _c) {
- if (!isConnected() ||_c == null) {
+ public void remove(IWsClient c) {
+ if (!isConnected() ||c == null) {
return;
}
- final String uid = _c.getUid();
- final boolean test = !(_c instanceof Client);
- if (test) {
- IKStream s = getTestByUid(uid);
- if (s != null) {
- s.release(this);
- }
+ if (!(c instanceof Client)) {
+ testProcessor.remove(c);
return;
}
- Client c = (Client)_c;
- for (StreamDesc sd : c.getStreams()) {
- IKStream s = getByUid(sd.getUid());
- if (s != null) {
- s.release(this);
- }
- }
- if (c.getRoomId() != null) {
- KRoom room = getRoom(c.getRoomId());
- room.leave(this, c);
- checkStreams(c.getRoomId());
- }
+ streamProcessor.remove((Client)c);
}
- private KRoom getRoom(Long roomId) {
+ KRoom getRoom(Long roomId) {
log.debug("Searching for room {}", roomId);
KRoom room = rooms.get(roomId);
@@ -613,22 +236,10 @@ public class KurentoHandler {
return room;
}
- private KStream getByUid(String uid) {
- return uid == null ? null : streamsByUid.get(uid);
- }
-
- private KTestStream getTestByUid(String uid) {
- return uid == null ? null : testsByUid.get(uid);
- }
-
static JSONObject newKurentoMsg() {
return new JSONObject().put("type", KURENTO_TYPE);
}
- static JSONObject newTestKurentoMsg() {
- return newKurentoMsg().put(TAG_MODE, MODE_TEST);
- }
-
public static boolean activityAllowed(Client c, Activity a, Room room) {
boolean r = false;
switch (a) {
@@ -651,7 +262,7 @@ public class KurentoHandler {
return getTurnServers(false);
}
- private JSONArray getTurnServers(final boolean test) {
+ JSONArray getTurnServers(final boolean test) {
JSONArray arr = new JSONArray();
if (!Strings.isEmpty(turnUrl)) {
try {
@@ -679,6 +290,14 @@ public class KurentoHandler {
return arr;
}
+ KurentoClient getClient() {
+ return client;
+ }
+
+ String getKuid() {
+ return kuid;
+ }
+
public void setCheckTimeout(long checkTimeout) {
this.checkTimeout = checkTimeout;
}
@@ -784,7 +403,7 @@ public class KurentoHandler {
return;
} else if (r != null) {
rooms.remove(r.getRoomId());
- r.close(KurentoHandler.this);
+ r.close(streamProcessor);
}
}
log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
@@ -813,7 +432,7 @@ public class KurentoHandler {
return;
}
Map<String, String> tags = tagsAsMap(point);
- KStream stream = getByUid(tags.get("outUid"));
+ KStream stream = streamProcessor.getByUid(tags.get("outUid"));
if (stream != null && stream.contains(tags.get("uid"))) {
return;
}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
new file mode 100644
index 0000000..4190583
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -0,0 +1,442 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed;
+import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.openmeetings.core.converter.IRecordingConverter;
+import org.apache.openmeetings.core.converter.InterviewConverter;
+import org.apache.openmeetings.core.converter.RecordingConverter;
+import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.dao.record.RecordingDao;
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.basic.Client.StreamType;
+import org.apache.openmeetings.db.entity.record.Recording;
+import org.apache.openmeetings.db.entity.room.Room;
+import org.apache.openmeetings.db.entity.room.Room.Right;
+import org.apache.openmeetings.db.entity.room.Room.RoomElement;
+import org.apache.openmeetings.db.manager.IClientManager;
+import org.apache.openmeetings.db.util.ws.RoomMessage;
+import org.apache.openmeetings.db.util.ws.TextRoomMessage;
+import org.kurento.client.IceCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.stereotype.Component;
+
+import com.github.openjson.JSONObject;
+
+@Component
+public class StreamProcessor implements IStreamProcessor {
+ private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
+ private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>();
+
+ @Autowired
+ private IClientManager cm;
+ @Autowired
+ private RecordingDao recDao;
+ @Autowired
+ private KurentoHandler kHandler;
+ @Autowired
+ private TaskExecutor taskExecutor;
+ @Autowired
+ private RecordingConverter recordingConverter;
+ @Autowired
+ private InterviewConverter interviewConverter;
+
+ void onMessage(Client c, final String cmdId, JSONObject msg) {
+ final String uid = msg.optString("uid");
+ KStream sender;
+ StreamDesc sd;
+ Optional<StreamDesc> osd;
+ log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
+ switch (cmdId) {
+ case "devicesAltered":
+ if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) {
+ c.remove(Activity.AUDIO);
+ }
+ if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) {
+ c.remove(Activity.VIDEO);
+ }
+ c.getStream(uid).setActivities();
+ WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.rightUpdated, c.getUid()));
+ break;
+ case "toggleActivity":
+ toggleActivity(c, Activity.valueOf(msg.getString("activity")));
+ break;
+ case "broadcastStarted":
+ sd = c.getStream(uid);
+ sender = getByUid(uid);
+ if (sender == null) {
+ KRoom room = kHandler.getRoom(c.getRoomId());
+ sender = room.join(sd);
+ }
+ sender.startBroadcast(this, sd, msg.getString("sdpOffer"));
+ if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
+ startRecording(c);
+ }
+ break;
+ case "onIceCandidate":
+ sender = getByUid(uid);
+ if (sender != null) {
+ JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
+ IceCandidate cand = new IceCandidate(
+ candidate.getString(PARAM_CANDIDATE)
+ , candidate.getString("sdpMid")
+ , candidate.getInt("sdpMLineIndex"));
+ sender.addCandidate(cand, msg.getString("luid"));
+ }
+ break;
+ case "addListener":
+ sender = getByUid(msg.getString("sender"));
+ if (sender != null) {
+ sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer"));
+ }
+ break;
+ case "wannaShare":
+ osd = c.getScreenStream();
+ if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) {
+ startSharing(c, osd, msg, Activity.SCREEN);
+ }
+ break;
+ case "wannaRecord":
+ osd = c.getScreenStream();
+ if (recordingAllowed(c)) {
+ Room r = c.getRoom();
+ if (Room.Type.interview == r.getType()) {
+ log.warn("This shouldn't be called for interview room");
+ break;
+ }
+ boolean sharing = isSharing(r.getId());
+ startSharing(c, osd, msg, Activity.RECORD);
+ if (sharing) {
+ startRecording(c);
+ }
+ }
+ break;
+ case "pauseSharing":
+ pauseSharing(c, uid);
+ break;
+ case "stopRecord":
+ stopRecording(c);
+ break;
+ case "errorSharing":
+ errorSharing(c);
+ break;
+ }
+ }
+
+ private static boolean isBroadcasting(final Client c) {
+ return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO);
+ }
+
+ public void toggleActivity(Client c, Activity a) {
+ log.info("PARTICIPANT {}: trying to toggle activity {}", c, c.getRoomId());
+
+ if (!activityAllowed(c, a, c.getRoom())) {
+ if (a == Activity.AUDIO || a == Activity.AUDIO_VIDEO) {
+ c.allow(Room.Right.audio);
+ }
+ if (!c.getRoom().isAudioOnly() && (a == Activity.VIDEO || a == Activity.AUDIO_VIDEO)) {
+ c.allow(Room.Right.video);
+ }
+ }
+ if (activityAllowed(c, a, c.getRoom())) {
+ boolean wasBroadcasting = isBroadcasting(c);
+ if (a == Activity.AUDIO && !c.isMicEnabled()) {
+ return;
+ }
+ if (a == Activity.VIDEO && !c.isCamEnabled()) {
+ return;
+ }
+ if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) {
+ return;
+ }
+ c.toggle(a);
+ if (!isBroadcasting(c)) {
+ //close
+ boolean changed = false;
+ for (StreamDesc sd : c.getStreams()) {
+ KStream s = getByUid(sd.getUid());
+ if (StreamType.WEBCAM == sd.getType()) {
+ if (s != null) {
+ s.stopBroadcast(this);
+ }
+ c.removeStream(sd.getUid());
+ changed = true;
+ }
+ }
+ if (changed) {
+ cm.update(c);
+ checkStreams(c.getRoomId());
+ }
+ WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+ //FIXME TODO update interview buttons
+ } else if (!wasBroadcasting) {
+ //join
+ StreamDesc sd = c.addStream(StreamType.WEBCAM);
+ cm.update(c);
+ log.debug("User {}: has started broadcast", sd.getUid());
+ kHandler.sendClient(sd.getSid(), newKurentoMsg()
+ .put("id", "broadcast")
+ .put("stream", sd.toJson())
+ .put(PARAM_ICE, kHandler.getTurnServers(false)));
+ //FIXME TODO update interview buttons
+ } else {
+ //constraints were changed
+ for (StreamDesc sd : c.getStreams()) {
+ if (StreamType.WEBCAM == sd.getType()) {
+ sd.setActivities();
+ cm.update(c);
+ break;
+ }
+ }
+ WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+ }
+ }
+ }
+
+ private void checkStreams(Long roomId) {
+ if (!kHandler.isConnected()) {
+ return;
+ }
+ KRoom room = kHandler.getRoom(roomId);
+ if (room.isSharing()) {
+ List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
+ .flatMap(c -> c.getStreams().stream())
+ .filter(sd -> StreamType.SCREEN == sd.getType()).collect(Collectors.toList());
+ if (streams.isEmpty()) {
+ log.info("No more screen streams in the room, stopping sharing");
+ room.stopSharing();
+ if (Room.Type.interview != room.getType() && room.isRecording()) {
+ log.info("No more screen streams in the non-interview room, stopping recording");
+ room.stopRecording(this, null);
+ }
+ }
+ }
+ if (room.isRecording()) {
+ List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
+ .flatMap(c -> c.getStreams().stream())
+ .collect(Collectors.toList());
+ if (streams.isEmpty()) {
+ log.info("No more streams in the room, stopping recording");
+ room.stopRecording(this, null);
+ }
+ }
+ }
+
+ // Sharing
+ public boolean hasRightsToShare(Client c) {
+ Room r = c.getRoom();
+ return r != null && Room.Type.interview != r.getType()
+ && !r.isHidden(RoomElement.ScreenSharing)
+ && r.isAllowRecording() && c.hasRight(Right.share);
+ }
+
+ public boolean screenShareAllowed(Client c) {
+ if (!kHandler.isConnected()) {
+ return false;
+ }
+ Room r = c.getRoom();
+ return hasRightsToShare(c) && !isSharing(r.getId());
+ }
+
+ private void errorSharing(Client c) {
+ if (!kHandler.isConnected()) {
+ return;
+ }
+ KRoom room = kHandler.getRoom(c.getRoomId());
+ if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) {
+ return;
+ }
+ Optional<StreamDesc> osd = c.getScreenStream();
+ if (osd.isPresent()) {
+ stopSharing(c, osd.get().getUid());
+ } else {
+ room.stopSharing();
+ }
+ stopRecording(c);
+ }
+
+ private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
+ if (kHandler.isConnected() && c.getRoomId() != null) {
+ kHandler.getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a);
+ }
+ }
+
+ private void pauseSharing(Client c, String uid) {
+ if (!hasRightsToShare(c)) {
+ return;
+ }
+ if (!isSharing(c.getRoomId())) {
+ return;
+ }
+ if (isRecording(c.getRoomId())) {
+ StreamDesc sd = c.getStream(uid);
+ sd.removeActivity(Activity.SCREEN);
+ cm.update(c);
+ KStream sender = getByUid(uid);
+ sender.pauseSharing();
+ kHandler.sendShareUpdated(sd);
+ } else {
+ stopSharing(c, uid);
+ }
+ }
+
+ private void stopSharing(Client c, String uid) {
+ KStream sender = getByUid(uid);
+ StreamDesc sd = doStopSharing(c.getSid(), uid);
+ if (sender != null && sd != null) {
+ sender.stopBroadcast(this);
+ }
+ }
+
+ StreamDesc doStopSharing(String sid, String uid) {
+ return doStopSharing(getBySid(sid), uid);
+ }
+
+ private StreamDesc doStopSharing(Client c, String uid) {
+ StreamDesc sd = null;
+ if (c.getRoomId() != null) {
+ sd = c.getStream(uid);
+ if (sd != null && StreamType.SCREEN == sd.getType()) {
+ c.removeStream(uid);
+ cm.update(c);
+ checkStreams(c.getRoomId());
+ WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+ }
+ }
+ return sd;
+ }
+
+ public boolean isSharing(Long roomId) {
+ if (!kHandler.isConnected()) {
+ return false;
+ }
+ return kHandler.getRoom(roomId).isSharing();
+ }
+
+ // Recording
+
+ public boolean hasRightsToRecord(Client c) {
+ Room r = c.getRoom();
+ return r != null && r.isAllowRecording() && c.hasRight(Right.moderator);
+ }
+
+ public boolean recordingAllowed(Client c) {
+ if (!kHandler.isConnected()) {
+ return false;
+ }
+ Room r = c.getRoom();
+ return hasRightsToRecord(c) && !isRecording(r.getId());
+ }
+
+ public void startRecording(Client c) {
+ if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
+ return;
+ }
+ kHandler.getRoom(c.getRoomId()).startRecording(cm, c, recDao);
+ }
+
+ public void stopRecording(Client c) {
+ if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
+ return;
+ }
+ kHandler.getRoom(c.getRoomId()).stopRecording(this, c);
+ }
+
+ void startConvertion(Recording rec) {
+ IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter;
+ taskExecutor.execute(() -> conv.startConversion(rec));
+ }
+
+ public boolean isRecording(Long roomId) {
+ if (!kHandler.isConnected()) {
+ return false;
+ }
+ return kHandler.getRoom(roomId).isRecording();
+ }
+
+ void remove(Client c) {
+ for (StreamDesc sd : c.getStreams()) {
+ AbstractStream s = getByUid(sd.getUid());
+ if (s != null) {
+ s.release(this);
+ }
+ }
+ if (c.getRoomId() != null) {
+ KRoom room = kHandler.getRoom(c.getRoomId());
+ room.leave(this, c);
+ checkStreams(c.getRoomId());
+ }
+ }
+
+ void addStream(KStream stream) {
+ streamByUid.put(stream.getUid(), stream);
+ }
+
+ Client getBySid(String sid) {
+ return cm.getBySid(sid);
+ }
+
+ KStream getByUid(String uid) {
+ return uid == null ? null : streamByUid.get(uid);
+ }
+
+ KurentoHandler getHandler() {
+ return kHandler;
+ }
+
+ IClientManager getClientManager() {
+ return cm;
+ }
+
+ RecordingDao getRecordingDao() {
+ return recDao;
+ }
+
+ @Override
+ public void release(AbstractStream stream) {
+ final String uid = stream.getUid();
+ Client c = cm.getBySid(stream.getSid());
+ if (c != null) {
+ c.removeStream(uid);
+ cm.update(c);
+ }
+ streamByUid.remove(uid);
+ }
+
+ @Override
+ public void destroy() {
+ streamByUid.clear();
+ }
+}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
new file mode 100644
index 0000000..38a3b20
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
@@ -0,0 +1,126 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST;
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_KUID;
+import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.entity.basic.IWsClient;
+import org.kurento.client.IceCandidate;
+import org.kurento.client.MediaPipeline;
+import org.kurento.client.Transaction;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.github.openjson.JSONObject;
+
+@Component
+public class TestStreamProcessor implements IStreamProcessor {
+ private final Map<String, KTestStream> streamByUid = new ConcurrentHashMap<>();
+
+ @Autowired
+ private KurentoHandler kHandler;
+
+ void onMessage(IWsClient _c, final String cmdId, JSONObject msg) {
+ KTestStream user = getByUid(_c.getUid());
+ switch (cmdId) {
+ case "wannaRecord":
+ WebSocketHelper.sendClient(_c, newTestKurentoMsg()
+ .put("id", "canRecord")
+ .put(PARAM_ICE, kHandler.getTurnServers(true))
+ );
+ break;
+ case "record":
+ if (user != null) {
+ user.release(this);
+ }
+ user = new KTestStream(_c, msg, createTestPipeline());
+ streamByUid.put(_c.getUid(), user);
+ break;
+ case "iceCandidate":
+ JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
+ if (user != null) {
+ IceCandidate cand = new IceCandidate(candidate.getString(PARAM_CANDIDATE),
+ candidate.getString("sdpMid"), candidate.getInt("sdpMLineIndex"));
+ user.addCandidate(cand);
+ }
+ break;
+ case "wannaPlay":
+ WebSocketHelper.sendClient(_c, newTestKurentoMsg()
+ .put("id", "canPlay")
+ .put(PARAM_ICE, kHandler.getTurnServers(true))
+ );
+ break;
+ case "play":
+ if (user != null) {
+ user.play(_c, msg, createTestPipeline());
+ }
+ break;
+ }
+ }
+
+ private KTestStream getByUid(String uid) {
+ return uid == null ? null : streamByUid.get(uid);
+ }
+
+ private MediaPipeline createTestPipeline() {
+ Transaction t = kHandler.beginTransaction();
+ MediaPipeline pipe = kHandler.getClient().createMediaPipeline(t);
+ pipe.addTag(t, TAG_KUID, kHandler.getKuid());
+ pipe.addTag(t, TAG_MODE, MODE_TEST);
+ pipe.addTag(t, TAG_ROOM, MODE_TEST);
+ t.commit();
+ return pipe;
+ }
+
+ static JSONObject newTestKurentoMsg() {
+ return KurentoHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST);
+ }
+
+ void remove(IWsClient _c) {
+ AbstractStream s = getByUid(_c.getUid());
+ if (s != null) {
+ s.release(this);
+ }
+ }
+
+ @Override
+ public void release(AbstractStream stream) {
+ streamByUid.remove(stream.getUid());
+ }
+
+ @Override
+ public void destroy() {
+ for (Entry<String, KTestStream> e : streamByUid.entrySet()) {
+ e.getValue().release(this);
+ streamByUid.remove(e.getKey());
+ }
+ streamByUid.clear();
+ }
+}
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java
index bde9fa7..84487ff 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java
@@ -34,6 +34,7 @@ import org.kurento.client.internal.TransactionImpl;
import org.kurento.client.internal.client.RomManager;
import org.mockito.InjectMocks;
import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -41,7 +42,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import com.github.openjson.JSONObject;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({KurentoClient.class, WebSocketHelper.class, IKStream.class})
+@PrepareForTest({KurentoClient.class, WebSocketHelper.class, AbstractStream.class})
public class BaseMockedTest {
@Mock
protected RomManager romManager;
@@ -51,11 +52,19 @@ public class BaseMockedTest {
protected KurentoClient client;
@Spy
@InjectMocks
+ protected StreamProcessor streamProcessor;
+ @Spy
+ @InjectMocks
+ protected TestStreamProcessor testProcessor;
+ @Spy
+ @InjectMocks
protected KurentoHandler handler;
+
protected final static JSONObject MSG_BASE = new JSONObject();
@Before
public void setup() {
+ MockitoAnnotations.initMocks(this);
mockStatic(KurentoClient.class);
mockStatic(WebSocketHelper.class);
when(client.getServerManager()).thenReturn(kServerManager);
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
index 1877518..dd67727 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
@@ -34,22 +34,22 @@ public class TestNotConnectedMocked extends BaseMockedTest {
@Test
public void testRecordingAllowed() {
- assertFalse(handler.recordingAllowed(null));
+ assertFalse(streamProcessor.recordingAllowed(null));
}
@Test
public void testStartRecording() {
- handler.startRecording(null);
+ streamProcessor.startRecording(null);
}
@Test
public void testStopRecording() {
- handler.stopRecording(null);
+ streamProcessor.stopRecording(null);
}
@Test
public void testIsRecording() {
- assertFalse(handler.isRecording(null));
+ assertFalse(streamProcessor.isRecording(null));
}
@Test
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java
index fa546ee..c589bf4 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java
@@ -93,15 +93,15 @@ public class TestRoomFlowMocked extends BaseMockedTest {
@Test
public void testRecordingAllowed() {
Client c = getClient();
- assertFalse(handler.recordingAllowed(c));
+ assertFalse(streamProcessor.recordingAllowed(c));
c.setRoom(new Room());
- assertFalse(handler.recordingAllowed(c));
+ assertFalse(streamProcessor.recordingAllowed(c));
c.getRoom().setId(ROOM_ID);
c.getRoom().setAllowRecording(true);
- assertFalse(handler.recordingAllowed(c));
+ assertFalse(streamProcessor.recordingAllowed(c));
c.allow(Room.Right.moderator);
when(roomDao.get(ROOM_ID)).thenReturn(c.getRoom());
- assertTrue(handler.recordingAllowed(c));
+ assertTrue(streamProcessor.recordingAllowed(c));
}
private Client getClientWithRoom() {
@@ -145,7 +145,7 @@ public class TestRoomFlowMocked extends BaseMockedTest {
Client c = getClientFull();
when(roomDao.get(ROOM_ID)).thenReturn(c.getRoom());
handler.onMessage(c, msg);
- assertTrue(handler.isSharing(ROOM_ID));
+ assertTrue(streamProcessor.isSharing(ROOM_ID));
handler.onMessage(c, msg);
}
}
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 3febdfb..211d2db 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.calendar.AppointmentDao;
import org.apache.openmeetings.db.dao.log.ConferenceLogDao;
@@ -179,7 +180,7 @@ public class RoomPanel extends BasePanel {
sb.append("VideoManager.play(").append(streams).append(", ").append(kHandler.getTurnServers()).append(");");
}
}
- if (interview && !kHandler.isRecording(r.getId()) && streams.length() > 0 && _c.hasRight(Right.moderator)) {
+ if (interview && !streamProcessor.isRecording(r.getId()) && streams.length() > 0 && _c.hasRight(Right.moderator)) {
sb.append("WbArea.setRecEnabled(true);");
}
if (!Strings.isEmpty(sb)) {
@@ -241,6 +242,8 @@ public class RoomPanel extends BasePanel {
private QuickPollManager qpollManager;
@SpringBean
private KurentoHandler kHandler;
+ @SpringBean
+ private StreamProcessor streamProcessor;
public RoomPanel(String id, Room r) {
super(id);
@@ -551,7 +554,7 @@ public class RoomPanel extends BasePanel {
private void updateInterviewRecordingButtons(IPartialPageRequestHandler handler) {
Client _c = getClient();
if (interview && _c.hasRight(Right.moderator)) {
- if (kHandler.isRecording(r.getId())) {
+ if (streamProcessor.isRecording(r.getId())) {
handler.appendJavaScript("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(true);}");
} else {
boolean hasStreams = false;
@@ -746,7 +749,7 @@ public class RoomPanel extends BasePanel {
}
public boolean screenShareAllowed() {
- return kHandler.screenShareAllowed(getClient());
+ return streamProcessor.screenShareAllowed(getClient());
}
public RoomSidebar getSidebar() {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java
index a154a00..0f98545 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.dao.basic.ConfigurationDao;
import org.apache.openmeetings.db.entity.basic.Client;
@@ -103,6 +104,8 @@ public class RoomMenuPanel extends Panel {
private ConfigurationDao cfgDao;
@SpringBean
private KurentoHandler kHandler;
+ @SpringBean
+ private StreamProcessor streamProcessor;
public RoomMenuPanel(String id, final RoomPanel room) {
super(id);
@@ -208,7 +211,7 @@ public class RoomMenuPanel extends Panel {
menuPanel.update(handler);
StringBuilder roomClass = new StringBuilder("room name");
StringBuilder roomTitle = new StringBuilder();
- if (kHandler.isRecording(r.getId())) {
+ if (streamProcessor.isRecording(r.getId())) {
JSONObject ru = kHandler.getRecordingUser(r.getId());
if (!Strings.isEmpty(ru.optString("login"))) {
roomTitle.append(String.format("%s %s %s %s %s", getString("419")
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js
index aec324e..7525049 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js
@@ -38,15 +38,10 @@ var Sharer = (function() {
, height: height.val()
});
} else {
- const cuid = Room.getOptions().uid
- , v = $('div[data-client-uid="' + cuid + '"][data-client-type="SCREEN"]')
- , uid = v.data().stream().uid;
VideoManager.sendMessage({
- id: 'stopSharing'
- , uid: uid
+ id: 'pauseSharing'
+ , uid: _getShareUid()
});
- VideoManager.close(uid, false);
- _setShareState(SHARE_STOPED);
}
});
width = sharer.find('.width');
@@ -64,15 +59,10 @@ var Sharer = (function() {
, height: height.val()
});
} else {
- const cuid = Room.getOptions().uid
- , v = $('div[data-client-uid="' + cuid + '"][data-client-type="SCREEN"]')
- , uid = v.data().stream().uid;
VideoManager.sendMessage({
id: 'stopRecord'
- , uid: uid
+ , uid: _getShareUid()
});
- VideoManager.close(uid, false);
- _setRecState(SHARE_STOPED);
}
});
}
@@ -177,6 +167,10 @@ var Sharer = (function() {
}
return cnts;
}
+ function _getShareUid() {
+ const v = $('div[data-client-uid="' + Room.getOptions().uid + '"][data-client-type="SCREEN"]');
+ return v && v.data() && v.data().stream() ? v.data().stream().uid : '';
+ }
self.init = _init;
self.open = function() {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
index 40760df..1e5294d 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
@@ -25,7 +25,11 @@ var VideoManager = (function() {
, uid = sd.uid
, w = $('#' + VideoUtil.getVid(uid))
, v = w.data();
- v.stream().activities = sd.activities;
+ if (!VideoUtil.isSharing(sd) && !VideoUtil.isRecording(sd)) {
+ VideoManager.close(uid, false);
+ } else {
+ v.stream().activities = sd.activities;
+ }
Sharer.setShareState(VideoUtil.isSharing(sd) ? SHARE_STARTED : SHARE_STOPED);
Sharer.setRecState(VideoUtil.isRecording(sd) ? SHARE_STARTED : SHARE_STOPED);
}
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java
index 5be3cda..14499b3 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java
@@ -24,7 +24,7 @@ import static org.apache.wicket.ajax.attributes.CallbackParameter.explicit;
import java.util.ArrayList;
-import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
import org.apache.openmeetings.core.util.WebSocketHelper;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.room.Room;
@@ -193,7 +193,7 @@ public class RoomSidebar extends Panel {
if (!avInited) {
avInited = true;
if (Room.Type.conference == room.getRoom().getType()) {
- kurento.toggleActivity(c, Client.Activity.AUDIO_VIDEO);
+ streamProcessor.toggleActivity(c, Client.Activity.AUDIO_VIDEO);
}
}
cm.update(c);
@@ -206,7 +206,7 @@ public class RoomSidebar extends Panel {
@SpringBean
private ClientManager cm;
@SpringBean
- private KurentoHandler kurento;
+ private StreamProcessor streamProcessor;
public RoomSidebar(String id, final RoomPanel room) {
super(id);
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java
index 63c77f6..c047835 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java
@@ -20,7 +20,7 @@ package org.apache.openmeetings.web.room.wb;
import java.io.IOException;
-import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.db.entity.file.BaseFileItem;
import org.apache.openmeetings.db.entity.room.Room.Right;
@@ -36,7 +36,7 @@ public class InterviewWbPanel extends AbstractWbPanel {
private static final long serialVersionUID = 1L;
public static final ResourceReference INTERVIEWWB_JS_REFERENCE = new JavaScriptResourceReference(WbPanel.class, "interviewwb.js");
@SpringBean
- private KurentoHandler kurento;
+ private StreamProcessor streamProcessor;
public InterviewWbPanel(String id, RoomPanel rp) {
super(id, rp);
@@ -55,13 +55,13 @@ public class InterviewWbPanel extends AbstractWbPanel {
@Override
protected void processWbAction(WbAction a, JSONObject obj, AjaxRequestTarget target) throws IOException {
Client c = rp.getClient();
- if (kurento.recordingAllowed(c)) {
+ if (streamProcessor.recordingAllowed(c)) {
switch (a) {
case startRecording:
- kurento.startRecording(c);
+ streamProcessor.startRecording(c);
break;
case stopRecording:
- kurento.stopRecording(c);
+ streamProcessor.stopRecording(c);
break;
default:
//no-op
diff --git a/openmeetings-web/src/test/resources/keystore b/openmeetings-web/src/test/resources/keystore
index 8cc07dc..c2a5309 100644
Binary files a/openmeetings-web/src/test/resources/keystore and b/openmeetings-web/src/test/resources/keystore differ