You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by so...@apache.org on 2019/03/03 10:56:21 UTC

[openmeetings] branch master updated: [OPENMEETINGS-1955] major work on room recording seems to be done

This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ac19d4  [OPENMEETINGS-1955] major work on room recording seems to be done
1ac19d4 is described below

commit 1ac19d404d781825f653f35bd185e3e6cdb89716
Author: Maxim Solodovnik <so...@gmail.com>
AuthorDate: Sun Mar 3 17:56:08 2019 +0700

    [OPENMEETINGS-1955] major work on room recording seems to be done
---
 .../remote/{IKStream.java => AbstractStream.java}  |  26 +-
 .../core/remote/IStreamProcessor.java}             |  41 +-
 .../org/apache/openmeetings/core/remote/KRoom.java |  36 +-
 .../apache/openmeetings/core/remote/KStream.java   |  95 +++--
 .../openmeetings/core/remote/KTestStream.java      |  23 +-
 .../openmeetings/core/remote/KurentoHandler.java   | 433 ++------------------
 .../openmeetings/core/remote/StreamProcessor.java  | 442 +++++++++++++++++++++
 .../core/remote/TestStreamProcessor.java           | 126 ++++++
 .../openmeetings/core/remote/BaseMockedTest.java   |  11 +-
 .../core/remote/TestNotConnectedMocked.java        |   8 +-
 .../core/remote/TestRoomFlowMocked.java            |  10 +-
 .../apache/openmeetings/web/room/RoomPanel.java    |   9 +-
 .../openmeetings/web/room/menu/RoomMenuPanel.java  |   5 +-
 .../org/apache/openmeetings/web/room/raw-sharer.js |  20 +-
 .../openmeetings/web/room/raw-video-manager.js     |   6 +-
 .../openmeetings/web/room/sidebar/RoomSidebar.java |   6 +-
 .../openmeetings/web/room/wb/InterviewWbPanel.java |  10 +-
 openmeetings-web/src/test/resources/keystore       | Bin 1821 -> 4365 bytes
 18 files changed, 753 insertions(+), 554 deletions(-)

diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java
similarity index 68%
rename from openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKStream.java
rename to openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java
index c6979f0..462711b 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java
@@ -24,20 +24,36 @@ import org.kurento.client.PlayerEndpoint;
 import org.kurento.client.RecorderEndpoint;
 import org.kurento.client.WebRtcEndpoint;
 
-public interface IKStream {
-	void release(KurentoHandler h);
+public abstract class AbstractStream {
+	protected final String sid;
+	protected final String uid;
 
-	default WebRtcEndpoint createWebRtcEndpoint(MediaPipeline pipeline) {
+	public AbstractStream(final String sid, final String uid) {
+		this.sid = sid;
+		this.uid = uid;
+	}
+
+	public String getSid() {
+		return sid;
+	}
+
+	public String getUid() {
+		return uid;
+	}
+
+	public abstract void release(IStreamProcessor processor);
+
+	public WebRtcEndpoint createWebRtcEndpoint(MediaPipeline pipeline) {
 		return new WebRtcEndpoint.Builder(pipeline).build();
 	}
 
-	default RecorderEndpoint createRecorderEndpoint(MediaPipeline pipeline, String path, MediaProfileSpecType profile) {
+	public RecorderEndpoint createRecorderEndpoint(MediaPipeline pipeline, String path, MediaProfileSpecType profile) {
 		return new RecorderEndpoint.Builder(pipeline, path)
 				.stopOnEndOfStream()
 				.withMediaProfile(profile).build();
 	}
 
-	default PlayerEndpoint createPlayerEndpoint(MediaPipeline pipeline, String path) {
+	public PlayerEndpoint createPlayerEndpoint(MediaPipeline pipeline, String path) {
 		return new PlayerEndpoint.Builder(pipeline, path).build();
 	}
 }
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
similarity index 52%
copy from openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
copy to openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
index 1877518..d6575ed 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
@@ -1,5 +1,4 @@
 /*
-
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,41 +18,7 @@
  */
 package org.apache.openmeetings.core.remote;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import org.junit.Test;
-
-import com.github.openjson.JSONObject;
-
-public class TestNotConnectedMocked extends BaseMockedTest {
-	@Test
-	public void testNotConnected() {
-		handler.onMessage(null, MSG_BASE);
-	}
-
-	@Test
-	public void testRecordingAllowed() {
-		assertFalse(handler.recordingAllowed(null));
-	}
-
-	@Test
-	public void testStartRecording() {
-		handler.startRecording(null);
-	}
-
-	@Test
-	public void testStopRecording() {
-		handler.stopRecording(null);
-	}
-
-	@Test
-	public void testIsRecording() {
-		assertFalse(handler.isRecording(null));
-	}
-
-	@Test
-	public void testGetRecordingUser() {
-		assertEquals(new JSONObject().toString(), handler.getRecordingUser(null).toString());
-	}
+public interface IStreamProcessor {
+	void release(AbstractStream stream);
+	void destroy();
 }
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 8f5f7dd..1a4925d 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -106,9 +106,9 @@ public class KRoom {
 		return streams.values();
 	}
 
-	public void onStopBroadcast(KStream stream, final KurentoHandler h) {
+	public void onStopBroadcast(KStream stream, final StreamProcessor processor) {
 		streams.remove(stream.getUid());
-		stream.release(h);
+		stream.release(processor);
 		WebSocketHelper.sendAll(newKurentoMsg()
 				.put("id", "broadcastStopped")
 				.put("uid", stream.getUid())
@@ -118,7 +118,7 @@ public class KRoom {
 		//FIXME TODO permission can be removed, some listener might be required
 	}
 
-	public void leave(final KurentoHandler h, final Client c) {
+	public void leave(final StreamProcessor processor, final Client c) {
 		for (Map.Entry<String, KStream> e : streams.entrySet()) {
 			e.getValue().remove(c);
 		}
@@ -128,7 +128,7 @@ public class KRoom {
 			}
 			KStream stream = streams.remove(sd.getUid());
 			if (stream != null) {
-				stream.release(h);
+				stream.release(processor);
 			}
 		}
 	}
@@ -190,21 +190,32 @@ public class KRoom {
 		}
 	}
 
-	public void stopRecording(KurentoHandler h, Client c, RecordingDao recDao) {
+	public void stopRecording(final StreamProcessor processor, Client c) {
 		if (recordingStarted.compareAndSet(true, false)) {
 			log.debug("##REC:: recording in room {} is stopping {} ::", roomId, recordingId);
 			for (final KStream stream : streams.values()) {
 				stream.stopRecord();
 			}
-			Recording rec = recDao.get(recordingId);
+			Recording rec = processor.getRecordingDao().get(recordingId);
 			rec.setRecordEnd(new Date());
-			rec = recDao.update(rec);
+			rec = processor.getRecordingDao().update(rec);
 			recordingUser = new JSONObject();
 			recordingId = null;
 
-			h.startConvertion(rec);
+			processor.startConvertion(rec);
+			User u;
+			if (c == null) {
+				u = new User();
+			} else {
+				u = c.getUser();
+				Optional<StreamDesc> osd = c.getScreenStream();
+				if (osd.isPresent()) {
+					osd.get().removeActivity(Activity.RECORD);
+					processor.getClientManager().update(c);
+					processor.getHandler().sendShareUpdated(osd.get());
+				}
+			}
 			// Send notification to all users that the recording has been started
-			User u = c == null ? new User() : c.getUser();
 			WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.recordingToggled));
 			log.debug("##REC:: recording in room {} is stopped ::", roomId);
 		}
@@ -218,8 +229,9 @@ public class KRoom {
 		return new JSONObject(sharingUser.toString());
 	}
 
-	public void startSharing(KurentoHandler h, IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
+	public void startSharing(StreamProcessor processor, IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
 		StreamDesc sd = null;
+		KurentoHandler h = processor.getHandler();
 		if (sharingStarted.compareAndSet(false, true)) {
 			sharingUser.put("sid", c.getSid());
 			sd = c.addStream(StreamType.SCREEN, a);
@@ -246,9 +258,9 @@ public class KRoom {
 		}
 	}
 
-	public void close(final KurentoHandler h) {
+	public void close(final StreamProcessor processor) {
 		for (final KStream stream : streams.values()) {
-			stream.release(h);
+			stream.release(processor);
 		}
 		streams.clear();
 		pipeline.release(new Continuation<Void>() {
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index dbbd196..de77ad7 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -53,11 +53,9 @@ import org.slf4j.LoggerFactory;
 
 import com.github.openjson.JSONObject;
 
-public class KStream implements IKStream {
+public class KStream extends AbstractStream {
 	private static final Logger log = LoggerFactory.getLogger(KStream.class);
 
-	private final String sid;
-	private final String uid;
 	private final KRoom room;
 	private final StreamType streamType;
 	private MediaProfileSpecType profile;
@@ -68,17 +66,16 @@ public class KStream implements IKStream {
 	private Type type;
 
 	public KStream(final StreamDesc sd, KRoom room) {
+		super(sd.getSid(), sd.getUid());
 		this.room = room;
-		this.sid = sd.getSid();
-		this.uid = sd.getUid();
 		streamType = sd.getType();
 		//TODO Min/MaxVideoSendBandwidth
 		//TODO Min/Max Audio/Video RecvBandwidth
 	}
 
-	public KStream startBroadcast(final KurentoHandler h, final StreamDesc sd, final String sdpOffer) {
+	public KStream startBroadcast(final StreamProcessor processor, final StreamDesc sd, final String sdpOffer) {
 		if (outgoingMedia != null) {
-			release(h);
+			release(processor);
 		}
 		final boolean hasAudio = sd.hasActivity(Activity.AUDIO);
 		final boolean hasVideo = sd.hasActivity(Activity.VIDEO);
@@ -112,7 +109,7 @@ public class KStream implements IKStream {
 				profile = MediaProfileSpecType.WEBM_VIDEO_ONLY;
 				break;
 		}
-		outgoingMedia = createEndpoint(h, sd.getSid(), sd.getUid());
+		outgoingMedia = createEndpoint(processor, sd.getSid(), sd.getUid());
 		outgoingMedia.addMediaSessionTerminatedListener(evt -> {
 			log.warn("Media stream terminated");
 		});
@@ -120,16 +117,16 @@ public class KStream implements IKStream {
 			if (MediaFlowState.NOT_FLOWING == evt.getState()) {
 				log.warn("Media FlowOut :: {}", evt.getState());
 				if (StreamType.SCREEN == streamType) {
-					h.stopSharing(sid, uid);
+					processor.doStopSharing(sid, uid);
 				}
-				stopBroadcast(h);
+				stopBroadcast(processor);
 			}
 		});
 		outgoingMedia.addMediaFlowInStateChangeListener(evt -> {
 			log.warn("Media FlowIn :: {}", evt);
 		});
-		h.streamsByUid.put(uid, this);
-		addListener(h, sd.getSid(), sd.getUid(), sdpOffer);
+		processor.addStream(this);
+		addListener(processor, sd.getSid(), sd.getUid(), sdpOffer);
 		if (room.isRecording()) {
 			startRecord();
 		}
@@ -137,12 +134,12 @@ public class KStream implements IKStream {
 		WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
 		WebSocketHelper.sendRoomOthers(room.getRoomId(), c.getUid(), newKurentoMsg()
 				.put("id", "newStream")
-				.put(PARAM_ICE, h.getTurnServers())
+				.put(PARAM_ICE, processor.getHandler().getTurnServers())
 				.put("stream", sd.toJson()));
 		return this;
 	}
 
-	public void addListener(final KurentoHandler h, String sid, String uid, String sdpOffer) {
+	public void addListener(final StreamProcessor processor, String sid, String uid, String sdpOffer) {
 		final boolean self = uid.equals(this.uid);
 		log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", room.getRoomId());
 		log.trace("USER {}: SdpOffer is {}", uid, sdpOffer);
@@ -151,11 +148,11 @@ public class KStream implements IKStream {
 			return;
 		}
 
-		final WebRtcEndpoint endpoint = getEndpointForUser(h, sid, uid);
+		final WebRtcEndpoint endpoint = getEndpointForUser(processor, sid, uid);
 		final String sdpAnswer = endpoint.processOffer(sdpOffer);
 
 		log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
-		h.sendClient(sid, newKurentoMsg()
+		processor.getHandler().sendClient(sid, newKurentoMsg()
 				.put("id", "videoResponse")
 				.put("uid", this.uid)
 				.put("sdpAnswer", sdpAnswer));
@@ -163,7 +160,7 @@ public class KStream implements IKStream {
 		endpoint.gatherCandidates();
 	}
 
-	private WebRtcEndpoint getEndpointForUser(final KurentoHandler h, String sid, String uid) {
+	private WebRtcEndpoint getEndpointForUser(final StreamProcessor processor, String sid, String uid) {
 		if (uid.equals(this.uid)) {
 			log.debug("PARTICIPANT {}: configuring loopback", this.uid);
 			return outgoingMedia;
@@ -176,11 +173,11 @@ public class KStream implements IKStream {
 			listener.release();
 		}
 		log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, this.uid);
-		listener = createEndpoint(h, sid, uid);
+		listener = createEndpoint(processor, sid, uid);
 		listeners.put(uid, listener);
 
 		log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, this.uid);
-		Client cur = h.getBySid(this.sid);
+		Client cur = processor.getBySid(this.sid);
 		if (cur == null) {
 			log.warn("Client for endpoint dooesn't exists");
 		} else {
@@ -199,12 +196,12 @@ public class KStream implements IKStream {
 		return listener;
 	}
 
-	private WebRtcEndpoint createEndpoint(final KurentoHandler h, String sid, String uid) {
+	private WebRtcEndpoint createEndpoint(final StreamProcessor processor, String sid, String uid) {
 		WebRtcEndpoint endpoint = createWebRtcEndpoint(room.getPipeline());
 		endpoint.addTag("outUid", this.uid);
 		endpoint.addTag("uid", uid);
 
-		endpoint.addIceCandidateFoundListener(evt -> h.sendClient(sid, newKurentoMsg()
+		endpoint.addIceCandidateFoundListener(evt -> processor.getHandler().sendClient(sid, newKurentoMsg()
 						.put("id", "iceCandidate")
 						.put("uid", KStream.this.uid)
 						.put(PARAM_CANDIDATE, convert(JsonUtils.toJsonObject(evt.getCandidate()))))
@@ -258,37 +255,45 @@ public class KStream implements IKStream {
 		}
 	}
 
-	public void stopBroadcast(final KurentoHandler h) {
-		room.onStopBroadcast(this, h);
+	public void stopBroadcast(final StreamProcessor processor) {
+		room.onStopBroadcast(this, processor);
+	}
+
+	public void pauseSharing() {
+		releaseListeners();
+	}
+
+	private void releaseListeners() {
+		log.debug("PARTICIPANT {}: Releasing listeners", uid);
+		for (Entry<String, WebRtcEndpoint> entry : listeners.entrySet()) {
+			final String inUid = entry.getKey();
+			log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
+
+			final WebRtcEndpoint ep = entry.getValue();
+			ep.release(new Continuation<Void>() {
+				@Override
+				public void onSuccess(Void result) throws Exception {
+					log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
+				}
+
+				@Override
+				public void onError(Throwable cause) throws Exception {
+					log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
+				}
+			});
+		}
+		listeners.clear();
 	}
 
 	@Override
-	public void release(KurentoHandler h) {
+	public void release(IStreamProcessor processor) {
 		if (outgoingMedia != null) {
-			log.debug("PARTICIPANT {}: Releasing resources", uid);
-			for (Entry<String, WebRtcEndpoint> entry : listeners.entrySet()) {
-				final String inUid = entry.getKey();
-				log.trace("PARTICIPANT {}: Released incoming EP for {}", uid, inUid);
-
-				final WebRtcEndpoint ep = entry.getValue();
-				ep.release(new Continuation<Void>() {
-					@Override
-					public void onSuccess(Void result) throws Exception {
-						log.trace("PARTICIPANT {}: Released successfully incoming EP for {}", KStream.this.uid, inUid);
-					}
-
-					@Override
-					public void onError(Throwable cause) throws Exception {
-						log.warn("PARTICIPANT {}: Could not release incoming EP for {}", KStream.this.uid, inUid);
-					}
-				});
-			}
-			listeners.clear();
+			releaseListeners();
 			outgoingMedia.release();
 			outgoingMedia = null;
 		}
 		releaseRecorder();
-		h.streamsByUid.remove(uid);
+		processor.release(this);
 	}
 
 	private void releaseRecorder() {
@@ -315,10 +320,12 @@ public class KStream implements IKStream {
 		return new JSONObject(o.toString());
 	}
 
+	@Override
 	public String getSid() {
 		return sid;
 	}
 
+	@Override
 	public String getUid() {
 		return uid;
 	}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index 85e1a9c..b3395fd 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -20,8 +20,8 @@ package org.apache.openmeetings.core.remote;
 
 import static java.util.UUID.randomUUID;
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
-import static org.apache.openmeetings.core.remote.KurentoHandler.newTestKurentoMsg;
 import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
+import static org.apache.openmeetings.core.remote.TestStreamProcessor.newTestKurentoMsg;
 import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_WEBM;
 import static org.apache.openmeetings.util.OmFileHelper.TEST_SETUP_PREFIX;
 import static org.apache.openmeetings.util.OmFileHelper.getStreamsDir;
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.openjson.JSONObject;
 
-public class KTestStream implements IKStream {
+public class KTestStream extends AbstractStream {
 	private static final Logger log = LoggerFactory.getLogger(KTestStream.class);
 	private MediaPipeline pipeline;
 	private WebRtcEndpoint webRtcEndpoint;
@@ -56,13 +56,12 @@ public class KTestStream implements IKStream {
 	private RecorderEndpoint recorder;
 	private String recPath = null;
 	private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-	private final String uid;
 	private ScheduledFuture<?> recHandle;
 	private int recTime;
 
-	public KTestStream(IWsClient _c, JSONObject msg, MediaPipeline pipeline) {
+	public KTestStream(IWsClient c, JSONObject msg, MediaPipeline pipeline) {
+		super(null, c.getUid());
 		this.pipeline = pipeline;
-		this.uid = _c.getUid();
 		webRtcEndpoint = createWebRtcEndpoint(pipeline);
 		webRtcEndpoint.connect(webRtcEndpoint);
 
@@ -73,7 +72,7 @@ public class KTestStream implements IKStream {
 		recorder.addRecordingListener(evt -> {
 				recTime = 0;
 				recHandle = scheduler.scheduleAtFixedRate(
-						() -> WebSocketHelper.sendClient(_c, newTestKurentoMsg().put("id", "recording").put("time", recTime++))
+						() -> WebSocketHelper.sendClient(c, newTestKurentoMsg().put("id", "recording").put("time", recTime++))
 						, 0, 1, TimeUnit.SECONDS);
 				scheduler.schedule(() -> {
 						recorder.stop();
@@ -81,7 +80,7 @@ public class KTestStream implements IKStream {
 					}, 5, TimeUnit.SECONDS);
 			});
 		recorder.addStoppedListener(evt -> {
-				WebSocketHelper.sendClient(_c, newTestKurentoMsg().put("id", "recStopped"));
+				WebSocketHelper.sendClient(c, newTestKurentoMsg().put("id", "recStopped"));
 				releaseRecorder();
 			});
 		switch (profile) {
@@ -103,9 +102,9 @@ public class KTestStream implements IKStream {
 		String sdpOffer = msg.getString("sdpOffer");
 		String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);
 
-		addIceListener(_c);
+		addIceListener(c);
 
-		WebSocketHelper.sendClient(_c, newTestKurentoMsg()
+		WebSocketHelper.sendClient(c, newTestKurentoMsg()
 				.put("id", "startResponse")
 				.put("sdpAnswer", sdpAnswer));
 		webRtcEndpoint.gatherCandidates();
@@ -117,7 +116,7 @@ public class KTestStream implements IKStream {
 
 			@Override
 			public void onError(Throwable cause) throws Exception {
-				sendError(_c, "Failed to start recording");
+				sendError(c, "Failed to start recording");
 				log.error("Failed to start recording", cause);
 			}
 		});
@@ -217,13 +216,13 @@ public class KTestStream implements IKStream {
 	}
 
 	@Override
-	public void release(KurentoHandler h) {
+	public void release(IStreamProcessor processor) {
 		if (webRtcEndpoint != null) {
 			webRtcEndpoint.release();
 			webRtcEndpoint = null;
 		}
 		releasePlayer();
 		releaseRecorder();
-		h.testsByUid.remove(uid);
+		processor.release(this);
 	}
 }
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index afbf733..a940255 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -26,42 +26,31 @@ import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
 
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
 import org.apache.directory.api.util.Strings;
-import org.apache.openmeetings.core.converter.IRecordingConverter;
-import org.apache.openmeetings.core.converter.InterviewConverter;
-import org.apache.openmeetings.core.converter.RecordingConverter;
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
-import org.apache.openmeetings.db.dao.record.RecordingDao;
 import org.apache.openmeetings.db.dao.room.RoomDao;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.basic.Client.Activity;
 import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
-import org.apache.openmeetings.db.entity.basic.Client.StreamType;
 import org.apache.openmeetings.db.entity.basic.IWsClient;
-import org.apache.openmeetings.db.entity.record.Recording;
 import org.apache.openmeetings.db.entity.room.Room;
 import org.apache.openmeetings.db.entity.room.Room.Right;
-import org.apache.openmeetings.db.entity.room.Room.RoomElement;
 import org.apache.openmeetings.db.entity.user.User;
 import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
 import org.kurento.client.Endpoint;
 import org.kurento.client.EventListener;
-import org.kurento.client.IceCandidate;
 import org.kurento.client.KurentoClient;
 import org.kurento.client.KurentoConnectionListener;
 import org.kurento.client.MediaObject;
@@ -75,7 +64,6 @@ import org.kurento.client.WebRtcEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.task.TaskExecutor;
 
 import com.github.openjson.JSONArray;
 import com.github.openjson.JSONObject;
@@ -86,9 +74,9 @@ public class KurentoHandler {
 	public static final String PARAM_CANDIDATE = "candidate";
 	private static final String WARN_NO_KURENTO = "Media Server is not accessible";
 	public static final String MODE_TEST = "test";
-	private static final String TAG_KUID = "kuid";
+	public static final String TAG_KUID = "kuid";
 	public static final String TAG_MODE = "mode";
-	private static final String TAG_ROOM = "roomId";
+	public static final String TAG_ROOM = "roomId";
 	private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
 	private final ScheduledExecutorService recheckScheduler = Executors.newScheduledThreadPool(1);
 	public static final String KURENTO_TYPE = "kurento";
@@ -105,8 +93,6 @@ public class KurentoHandler {
 	private boolean connected = false;
 	private String kuid;
 	private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
-	final Map<String, KStream> streamsByUid = new ConcurrentHashMap<>();
-	final Map<String, KTestStream> testsByUid = new ConcurrentHashMap<>();
 	private Runnable check;
 
 	@Autowired
@@ -114,15 +100,11 @@ public class KurentoHandler {
 	@Autowired
 	private RoomDao roomDao;
 	@Autowired
-	private RecordingDao recDao;
-	@Autowired
 	private RecordingChunkDao chunkDao;
 	@Autowired
-	private TaskExecutor taskExecutor;
-	@Autowired
-	private RecordingConverter recordingConverter;
+	private TestStreamProcessor testProcessor;
 	@Autowired
-	private InterviewConverter interviewConverter;
+	private StreamProcessor streamProcessor;
 
 	boolean isConnected() {
 		boolean connctd = client != null && !client.isClosed() && connected;
@@ -151,14 +133,11 @@ public class KurentoHandler {
 			kuid = randomUUID().toString(); // will be changed to prevent double events
 			client.destroy();
 			for (Entry<Long, KRoom> e : rooms.entrySet()) {
-				e.getValue().close(this);
+				e.getValue().close(streamProcessor);
 			}
+			testProcessor.destroy();
+			streamProcessor.destroy();
 			rooms.clear();
-			for (Entry<String, KTestStream> e : testsByUid.entrySet()) {
-				e.getValue().release(this);
-			}
-			testsByUid.clear();
-			streamsByUid.clear();
 			client = null;
 		}
 	}
@@ -175,136 +154,6 @@ public class KurentoHandler {
 		return client.beginTransaction();
 	}
 
-	private MediaPipeline createTestPipeline() {
-		Transaction t = beginTransaction();
-		MediaPipeline pipe = client.createMediaPipeline(t);
-		pipe.addTag(t, TAG_KUID, kuid);
-		pipe.addTag(t, TAG_MODE, MODE_TEST);
-		pipe.addTag(t, TAG_ROOM, MODE_TEST);
-		t.commit();
-		return pipe;
-	}
-
-	private void onTestMessage(IWsClient _c, final String cmdId, JSONObject msg) {
-		KTestStream user = getTestByUid(_c.getUid());
-		switch (cmdId) {
-			case "wannaRecord":
-				WebSocketHelper.sendClient(_c, newTestKurentoMsg()
-						.put("id", "canRecord")
-						.put(PARAM_ICE, getTurnServers(true))
-						);
-				break;
-			case "record":
-				if (user != null) {
-					user.release(this);
-				}
-				user = new KTestStream(_c, msg, createTestPipeline());
-				testsByUid.put(_c.getUid(), user);
-				break;
-			case "iceCandidate":
-				JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
-				if (user != null) {
-					IceCandidate cand = new IceCandidate(candidate.getString(PARAM_CANDIDATE),
-							candidate.getString("sdpMid"), candidate.getInt("sdpMLineIndex"));
-					user.addCandidate(cand);
-				}
-				break;
-			case "wannaPlay":
-				WebSocketHelper.sendClient(_c, newTestKurentoMsg()
-						.put("id", "canPlay")
-						.put(PARAM_ICE, getTurnServers(true))
-						);
-				break;
-			case "play":
-				if (user != null) {
-					user.play(_c, msg, createTestPipeline());
-				}
-				break;
-		}
-	}
-
-	private void onMessage(Client c, final String cmdId, JSONObject msg) {
-		final String uid = msg.optString("uid");
-		KStream sender;
-		StreamDesc sd;
-		Optional<StreamDesc> osd;
-		log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
-		switch (cmdId) {
-			case "devicesAltered":
-				if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) {
-					c.remove(Activity.AUDIO);
-				}
-				if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) {
-					c.remove(Activity.VIDEO);
-				}
-				c.getStream(uid).setActivities();
-				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.rightUpdated, c.getUid()));
-				break;
-			case "toggleActivity":
-				toggleActivity(c, Activity.valueOf(msg.getString("activity")));
-				break;
-			case "broadcastStarted":
-				sd = c.getStream(uid);
-				sender = getByUid(uid);
-				if (sender == null) {
-					KRoom room = getRoom(c.getRoomId());
-					sender = room.join(sd);
-				}
-				sender.startBroadcast(this, sd, msg.getString("sdpOffer"));
-				if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
-					startRecording(c);
-				}
-				break;
-			case "onIceCandidate":
-				sender = getByUid(uid);
-				if (sender != null) {
-					JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
-					IceCandidate cand = new IceCandidate(
-							candidate.getString(PARAM_CANDIDATE)
-							, candidate.getString("sdpMid")
-							, candidate.getInt("sdpMLineIndex"));
-					sender.addCandidate(cand, msg.getString("luid"));
-				}
-				break;
-			case "addListener":
-				sender = getByUid(msg.getString("sender"));
-				if (sender != null) {
-					sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer"));
-				}
-				break;
-			case "wannaShare":
-				osd = c.getScreenStream();
-				if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) {
-					startSharing(c, osd, msg, Activity.SCREEN);
-				}
-				break;
-			case "wannaRecord":
-				osd = c.getScreenStream();
-				if (recordingAllowed(c)) {
-					Room r = c.getRoom();
-					if (Room.Type.interview == r.getType()) {
-						log.warn("This shouldn't be called for interview room");
-						break;
-					}
-					boolean sharing = isSharing(r.getId());
-					startSharing(c, osd, msg, Activity.RECORD);
-					if (sharing) {
-						startRecording(c);
-					}
-				}
-				break;
-			case "stopSharing":
-				stopSharing(c, uid);
-				break;
-			case "stopRecord":
-				stopRecording(c);
-				break;
-			case "errorSharing":
-				errorSharing(c);
-				break;
-		}
-	}
-
 	public void onMessage(IWsClient _c, JSONObject msg) {
 		if (!isConnected()) {
 			sendError(_c, "Multimedia server is inaccessible");
@@ -312,7 +161,7 @@ public class KurentoHandler {
 		}
 		final String cmdId = msg.getString("id");
 		if (MODE_TEST.equals(msg.optString(TAG_MODE))) {
-			onTestMessage(_c, cmdId, msg);
+			testProcessor.onMessage(_c, cmdId, msg);
 		} else {
 			final Client c = (Client)_c;
 
@@ -320,146 +169,8 @@ public class KurentoHandler {
 				log.warn("Incoming message from invalid user");
 				return;
 			}
-			onMessage(c, cmdId, msg);
-		}
-	}
-
-	private static boolean isBroadcasting(final Client c) {
-		return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO);
-	}
-
-	private void checkStreams(Long roomId) {
-		if (!isConnected()) {
-			return;
-		}
-		KRoom room = getRoom(roomId);
-		if (room.isSharing()) {
-			List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
-					.flatMap(c -> c.getStreams().stream())
-					.filter(sd -> StreamType.SCREEN == sd.getType()).collect(Collectors.toList());
-			if (streams.isEmpty()) {
-				log.info("No more screen streams in the room, stopping sharing");
-				room.stopSharing();
-				if (Room.Type.interview != room.getType() && room.isRecording()) {
-					log.info("No more screen streams in the non-interview room, stopping recording");
-					room.stopRecording(this, null, recDao);
-				}
-			}
-		}
-		if (room.isRecording()) {
-			List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
-					.flatMap(c -> c.getStreams().stream())
-					.collect(Collectors.toList());
-			if (streams.isEmpty()) {
-				log.info("No more streams in the room, stopping recording");
-				room.stopRecording(this, null, recDao);
-			}
-		}
-	}
-
-	public void toggleActivity(Client c, Activity a) {
-		log.info("PARTICIPANT {}: trying to toggle activity {}", c, c.getRoomId());
-
-		if (!activityAllowed(c, a, c.getRoom())) {
-			if (a == Activity.AUDIO || a == Activity.AUDIO_VIDEO) {
-				c.allow(Room.Right.audio);
-			}
-			if (!c.getRoom().isAudioOnly() && (a == Activity.VIDEO || a == Activity.AUDIO_VIDEO)) {
-				c.allow(Room.Right.video);
-			}
-		}
-		if (activityAllowed(c, a, c.getRoom())) {
-			boolean wasBroadcasting = isBroadcasting(c);
-			if (a == Activity.AUDIO && !c.isMicEnabled()) {
-				return;
-			}
-			if (a == Activity.VIDEO && !c.isCamEnabled()) {
-				return;
-			}
-			if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) {
-				return;
-			}
-			c.toggle(a);
-			if (!isBroadcasting(c)) {
-				//close
-				boolean changed = false;
-				for (StreamDesc sd : c.getStreams()) {
-					KStream s = getByUid(sd.getUid());
-					if (StreamType.WEBCAM == sd.getType()) {
-						if (s != null) {
-							s.stopBroadcast(this);
-						}
-						c.removeStream(sd.getUid());
-						changed = true;
-					}
-				}
-				if (changed) {
-					cm.update(c);
-					checkStreams(c.getRoomId());
-				}
-				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
-				//FIXME TODO update interview buttons
-			} else if (!wasBroadcasting) {
-				//join
-				StreamDesc sd = c.addStream(StreamType.WEBCAM);
-				cm.update(c);
-				log.debug("User {}: has started broadcast", sd.getUid());
-				sendClient(sd.getSid(), newKurentoMsg()
-						.put("id", "broadcast")
-						.put("stream", sd.toJson())
-						.put(PARAM_ICE, getTurnServers(false)));
-				//FIXME TODO update interview buttons
-			} else {
-				//constraints were changed
-				for (StreamDesc sd : c.getStreams()) {
-					if (StreamType.WEBCAM == sd.getType()) {
-						sd.setActivities();
-						cm.update(c);
-						break;
-					}
-				}
-				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
-			}
-		}
-	}
-
-	public boolean hasRightsToRecord(Client c) {
-		Room r = c.getRoom();
-		return r != null && r.isAllowRecording() && c.hasRight(Right.moderator);
-	}
-
-	public boolean recordingAllowed(Client c) {
-		if (!isConnected()) {
-			return false;
-		}
-		Room r = c.getRoom();
-		return hasRightsToRecord(c) && !isRecording(r.getId());
-	}
-
-	public void startRecording(Client c) {
-		if (!isConnected() || !hasRightsToRecord(c)) {
-			return;
-		}
-		getRoom(c.getRoomId()).startRecording(cm, c, recDao);
-	}
-
-	public void stopRecording(Client c) {
-		if (!isConnected() || !hasRightsToRecord(c)) {
-			return;
+			streamProcessor.onMessage(c, cmdId, msg);
 		}
-		getRoom(c.getRoomId()).stopRecording(this, c, recDao);
-	}
-
-	void startConvertion(Recording rec) {
-		IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter;
-		taskExecutor.execute(() -> conv.startConversion(rec));
-	}
-
-	public boolean isRecording(Long roomId) {
-		if (!isConnected()) {
-			return false;
-		}
-		return getRoom(roomId).isRecording();
 	}
 
 	public JSONObject getRecordingUser(Long roomId) {
@@ -469,74 +180,6 @@ public class KurentoHandler {
 		return getRoom(roomId).getRecordingUser();
 	}
 
-	public boolean hasRightsToShare(Client c) {
-		Room r = c.getRoom();
-		return r != null && Room.Type.interview != r.getType()
-				&& !r.isHidden(RoomElement.ScreenSharing)
-				&& r.isAllowRecording() && c.hasRight(Right.share);
-	}
-
-	public boolean screenShareAllowed(Client c) {
-		if (!isConnected()) {
-			return false;
-		}
-		Room r = c.getRoom();
-		return hasRightsToShare(c) && !isSharing(r.getId());
-	}
-
-	private void errorSharing(Client c) {
-		if (!isConnected()) {
-			return;
-		}
-		KRoom room = getRoom(c.getRoomId());
-		if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) {
-			return;
-		}
-		Optional<StreamDesc> osd = c.getScreenStream();
-		if (osd.isPresent()) {
-			stopSharing(c, osd.get().getUid());
-		} else {
-			room.stopSharing();
-		}
-		stopRecording(c);
-	}
-
-	private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
-		if (isConnected() && c.getRoomId() != null) {
-			getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a);
-		}
-	}
-
-	private void stopSharing(Client c, String uid) {
-		KStream sender = getByUid(uid);
-		StreamDesc sd = stopSharing(c.getSid(), uid);
-		if (sender != null && sd != null) {
-			sender.stopBroadcast(this);
-		}
-	}
-
-	StreamDesc stopSharing(String sid, String uid) {
-		StreamDesc sd = null;
-		Client c = getBySid(sid);
-		if (c.getRoomId() != null) {
-			sd = c.getStream(uid);
-			if (sd != null && StreamType.SCREEN == sd.getType()) {
-				c.removeStream(uid);
-				cm.update(c);
-				checkStreams(c.getRoomId());
-				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
-			}
-		}
-		return sd;
-	}
-
-	public boolean isSharing(Long roomId) {
-		if (!isConnected()) {
-			return false;
-		}
-		return getRoom(roomId).isSharing();
-	}
-
 	public void leaveRoom(Client c) {
 		remove(c);
 		WebSocketHelper.sendAll(newKurentoMsg()
@@ -546,10 +189,6 @@ public class KurentoHandler {
 			);
 	}
 
-	Client getBySid(String sid) {
-		return cm.getBySid(sid);
-	}
-
 	void sendShareUpdated(StreamDesc sd) {
 		sendClient(sd.getSid(), newKurentoMsg()
 				.put("id", "shareUpdated")
@@ -567,34 +206,18 @@ public class KurentoHandler {
 				.put("message", msg));
 	}
 
-	public void remove(IWsClient _c) {
-		if (!isConnected() ||_c == null) {
+	public void remove(IWsClient c) {
+		if (!isConnected() ||c == null) {
 			return;
 		}
-		final String uid = _c.getUid();
-		final boolean test = !(_c instanceof Client);
-		if (test) {
-			IKStream s = getTestByUid(uid);
-			if (s != null) {
-				s.release(this);
-			}
+		if (!(c instanceof Client)) {
+			testProcessor.remove(c);
 			return;
 		}
-		Client c = (Client)_c;
-		for (StreamDesc sd : c.getStreams()) {
-			IKStream s = getByUid(sd.getUid());
-			if (s != null) {
-				s.release(this);
-			}
-		}
-		if (c.getRoomId() != null) {
-			KRoom room = getRoom(c.getRoomId());
-			room.leave(this, c);
-			checkStreams(c.getRoomId());
-		}
+		streamProcessor.remove((Client)c);
 	}
 
-	private KRoom getRoom(Long roomId) {
+	KRoom getRoom(Long roomId) {
 		log.debug("Searching for room {}", roomId);
 		KRoom room = rooms.get(roomId);
 
@@ -613,22 +236,10 @@ public class KurentoHandler {
 		return room;
 	}
 
-	private KStream getByUid(String uid) {
-		return uid == null ? null : streamsByUid.get(uid);
-	}
-
-	private KTestStream getTestByUid(String uid) {
-		return uid == null ? null : testsByUid.get(uid);
-	}
-
 	static JSONObject newKurentoMsg() {
 		return new JSONObject().put("type", KURENTO_TYPE);
 	}
 
-	static JSONObject newTestKurentoMsg() {
-		return newKurentoMsg().put(TAG_MODE, MODE_TEST);
-	}
-
 	public static boolean activityAllowed(Client c, Activity a, Room room) {
 		boolean r = false;
 		switch (a) {
@@ -651,7 +262,7 @@ public class KurentoHandler {
 		return getTurnServers(false);
 	}
 
-	private JSONArray getTurnServers(final boolean test) {
+	JSONArray getTurnServers(final boolean test) {
 		JSONArray arr = new JSONArray();
 		if (!Strings.isEmpty(turnUrl)) {
 			try {
@@ -679,6 +290,14 @@ public class KurentoHandler {
 		return arr;
 	}
 
+	KurentoClient getClient() {
+		return client;
+	}
+
+	String getKuid() {
+		return kuid;
+	}
+
 	public void setCheckTimeout(long checkTimeout) {
 		this.checkTimeout = checkTimeout;
 	}
@@ -784,7 +403,7 @@ public class KurentoHandler {
 							return;
 						} else if (r != null) {
 							rooms.remove(r.getRoomId());
-							r.close(KurentoHandler.this);
+							r.close(streamProcessor);
 						}
 					}
 					log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags);
@@ -813,7 +432,7 @@ public class KurentoHandler {
 						return;
 					}
 					Map<String, String> tags = tagsAsMap(point);
-					KStream stream = getByUid(tags.get("outUid"));
+					KStream stream = streamProcessor.getByUid(tags.get("outUid"));
 					if (stream != null && stream.contains(tags.get("uid"))) {
 						return;
 					}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
new file mode 100644
index 0000000..4190583
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -0,0 +1,442 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed;
+import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.openmeetings.core.converter.IRecordingConverter;
+import org.apache.openmeetings.core.converter.InterviewConverter;
+import org.apache.openmeetings.core.converter.RecordingConverter;
+import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.dao.record.RecordingDao;
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.basic.Client.StreamType;
+import org.apache.openmeetings.db.entity.record.Recording;
+import org.apache.openmeetings.db.entity.room.Room;
+import org.apache.openmeetings.db.entity.room.Room.Right;
+import org.apache.openmeetings.db.entity.room.Room.RoomElement;
+import org.apache.openmeetings.db.manager.IClientManager;
+import org.apache.openmeetings.db.util.ws.RoomMessage;
+import org.apache.openmeetings.db.util.ws.TextRoomMessage;
+import org.kurento.client.IceCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.stereotype.Component;
+
+import com.github.openjson.JSONObject;
+
+@Component
+public class StreamProcessor implements IStreamProcessor {
+	private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
+	private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>();
+
+	@Autowired
+	private IClientManager cm;
+	@Autowired
+	private RecordingDao recDao;
+	@Autowired
+	private KurentoHandler kHandler;
+	@Autowired
+	private TaskExecutor taskExecutor;
+	@Autowired
+	private RecordingConverter recordingConverter;
+	@Autowired
+	private InterviewConverter interviewConverter;
+
+	void onMessage(Client c, final String cmdId, JSONObject msg) {
+		final String uid = msg.optString("uid");
+		KStream sender;
+		StreamDesc sd;
+		Optional<StreamDesc> osd;
+		log.debug("Incoming message from user with ID '{}': {}", c.getUserId(), msg);
+		switch (cmdId) {
+			case "devicesAltered":
+				if (!msg.getBoolean("audio") && c.hasActivity(Activity.AUDIO)) {
+					c.remove(Activity.AUDIO);
+				}
+				if (!msg.getBoolean("video") && c.hasActivity(Activity.VIDEO)) {
+					c.remove(Activity.VIDEO);
+				}
+				c.getStream(uid).setActivities();
+				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), cm.update(c), RoomMessage.Type.rightUpdated, c.getUid()));
+				break;
+			case "toggleActivity":
+				toggleActivity(c, Activity.valueOf(msg.getString("activity")));
+				break;
+			case "broadcastStarted":
+				sd = c.getStream(uid);
+				sender = getByUid(uid);
+				if (sender == null) {
+					KRoom room = kHandler.getRoom(c.getRoomId());
+					sender = room.join(sd);
+				}
+				sender.startBroadcast(this, sd, msg.getString("sdpOffer"));
+				if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
+					startRecording(c);
+				}
+				break;
+			case "onIceCandidate":
+				sender = getByUid(uid);
+				if (sender != null) {
+					JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
+					IceCandidate cand = new IceCandidate(
+							candidate.getString(PARAM_CANDIDATE)
+							, candidate.getString("sdpMid")
+							, candidate.getInt("sdpMLineIndex"));
+					sender.addCandidate(cand, msg.getString("luid"));
+				}
+				break;
+			case "addListener":
+				sender = getByUid(msg.getString("sender"));
+				if (sender != null) {
+					sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer"));
+				}
+				break;
+			case "wannaShare":
+				osd = c.getScreenStream();
+				if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) {
+					startSharing(c, osd, msg, Activity.SCREEN);
+				}
+				break;
+			case "wannaRecord":
+				osd = c.getScreenStream();
+				if (recordingAllowed(c)) {
+					Room r = c.getRoom();
+					if (Room.Type.interview == r.getType()) {
+						log.warn("This shouldn't be called for interview room");
+						break;
+					}
+					boolean sharing = isSharing(r.getId());
+					startSharing(c, osd, msg, Activity.RECORD);
+					if (sharing) {
+						startRecording(c);
+					}
+				}
+				break;
+			case "pauseSharing":
+				pauseSharing(c, uid);
+				break;
+			case "stopRecord":
+				stopRecording(c);
+				break;
+			case "errorSharing":
+				errorSharing(c);
+				break;
+		}
+	}
+
+	private static boolean isBroadcasting(final Client c) {
+		return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO);
+	}
+
+	public void toggleActivity(Client c, Activity a) {
+		log.info("PARTICIPANT {}: trying to toggle activity {}", c, c.getRoomId());
+
+		if (!activityAllowed(c, a, c.getRoom())) {
+			if (a == Activity.AUDIO || a == Activity.AUDIO_VIDEO) {
+				c.allow(Room.Right.audio);
+			}
+			if (!c.getRoom().isAudioOnly() && (a == Activity.VIDEO || a == Activity.AUDIO_VIDEO)) {
+				c.allow(Room.Right.video);
+			}
+		}
+		if (activityAllowed(c, a, c.getRoom())) {
+			boolean wasBroadcasting = isBroadcasting(c);
+			if (a == Activity.AUDIO && !c.isMicEnabled()) {
+				return;
+			}
+			if (a == Activity.VIDEO && !c.isCamEnabled()) {
+				return;
+			}
+			if (a == Activity.AUDIO_VIDEO && !c.isMicEnabled() && !c.isCamEnabled()) {
+				return;
+			}
+			c.toggle(a);
+			if (!isBroadcasting(c)) {
+				//close
+				boolean changed = false;
+				for (StreamDesc sd : c.getStreams()) {
+					KStream s = getByUid(sd.getUid());
+					if (StreamType.WEBCAM == sd.getType()) {
+						if (s != null) {
+							s.stopBroadcast(this);
+						}
+						c.removeStream(sd.getUid());
+						changed = true;
+					}
+				}
+				if (changed) {
+					cm.update(c);
+					checkStreams(c.getRoomId());
+				}
+				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+				//FIXME TODO update interview buttons
+			} else if (!wasBroadcasting) {
+				//join
+				StreamDesc sd = c.addStream(StreamType.WEBCAM);
+				cm.update(c);
+				log.debug("User {}: has started broadcast", sd.getUid());
+				kHandler.sendClient(sd.getSid(), newKurentoMsg()
+						.put("id", "broadcast")
+						.put("stream", sd.toJson())
+						.put(PARAM_ICE, kHandler.getTurnServers(false)));
+				//FIXME TODO update interview buttons
+			} else {
+				//constraints were changed
+				for (StreamDesc sd : c.getStreams()) {
+					if (StreamType.WEBCAM == sd.getType()) {
+						sd.setActivities();
+						cm.update(c);
+						break;
+					}
+				}
+				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+			}
+		}
+	}
+
+	private void checkStreams(Long roomId) {
+		if (!kHandler.isConnected()) {
+			return;
+		}
+		KRoom room = kHandler.getRoom(roomId);
+		if (room.isSharing()) {
+			List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
+					.flatMap(c -> c.getStreams().stream())
+					.filter(sd -> StreamType.SCREEN == sd.getType()).collect(Collectors.toList());
+			if (streams.isEmpty()) {
+				log.info("No more screen streams in the room, stopping sharing");
+				room.stopSharing();
+				if (Room.Type.interview != room.getType() && room.isRecording()) {
+					log.info("No more screen streams in the non-interview room, stopping recording");
+					room.stopRecording(this, null);
+				}
+			}
+		}
+		if (room.isRecording()) {
+			List<StreamDesc> streams = cm.listByRoom(roomId).parallelStream()
+					.flatMap(c -> c.getStreams().stream())
+					.collect(Collectors.toList());
+			if (streams.isEmpty()) {
+				log.info("No more streams in the room, stopping recording");
+				room.stopRecording(this, null);
+			}
+		}
+	}
+
+	// Sharing
+	public boolean hasRightsToShare(Client c) {
+		Room r = c.getRoom();
+		return r != null && Room.Type.interview != r.getType()
+				&& !r.isHidden(RoomElement.ScreenSharing)
+				&& r.isAllowRecording() && c.hasRight(Right.share);
+	}
+
+	public boolean screenShareAllowed(Client c) {
+		if (!kHandler.isConnected()) {
+			return false;
+		}
+		Room r = c.getRoom();
+		return hasRightsToShare(c) && !isSharing(r.getId());
+	}
+
+	private void errorSharing(Client c) {
+		if (!kHandler.isConnected()) {
+			return;
+		}
+		KRoom room = kHandler.getRoom(c.getRoomId());
+		if (!room.isSharing() || !c.getSid().equals(room.getSharingUser().getString("sid"))) {
+			return;
+		}
+		Optional<StreamDesc> osd = c.getScreenStream();
+		if (osd.isPresent()) {
+			stopSharing(c, osd.get().getUid());
+		} else {
+			room.stopSharing();
+		}
+		stopRecording(c);
+	}
+
+	private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
+		if (kHandler.isConnected() && c.getRoomId() != null) {
+			kHandler.getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a);
+		}
+	}
+
+	private void pauseSharing(Client c, String uid) {
+		if (!hasRightsToShare(c)) {
+			return;
+		}
+		if (!isSharing(c.getRoomId())) {
+			return;
+		}
+		if (isRecording(c.getRoomId())) {
+			StreamDesc sd = c.getStream(uid);
+			sd.removeActivity(Activity.SCREEN);
+			cm.update(c);
+			KStream sender = getByUid(uid);
+			sender.pauseSharing();
+			kHandler.sendShareUpdated(sd);
+		} else {
+			stopSharing(c, uid);
+		}
+	}
+
+	private void stopSharing(Client c, String uid) {
+		KStream sender = getByUid(uid);
+		StreamDesc sd = doStopSharing(c.getSid(), uid);
+		if (sender != null && sd != null) {
+			sender.stopBroadcast(this);
+		}
+	}
+
+	StreamDesc doStopSharing(String sid, String uid) {
+		return doStopSharing(getBySid(sid), uid);
+	}
+
+	private StreamDesc doStopSharing(Client c, String uid) {
+		StreamDesc sd = null;
+		if (c.getRoomId() != null) {
+			sd = c.getStream(uid);
+			if (sd != null && StreamType.SCREEN == sd.getType()) {
+				c.removeStream(uid);
+				cm.update(c);
+				checkStreams(c.getRoomId());
+				WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.rightUpdated, c.getUid()));
+			}
+		}
+		return sd;
+	}
+
+	public boolean isSharing(Long roomId) {
+		if (!kHandler.isConnected()) {
+			return false;
+		}
+		return kHandler.getRoom(roomId).isSharing();
+	}
+
+	// Recording
+
+	public boolean hasRightsToRecord(Client c) {
+		Room r = c.getRoom();
+		return r != null && r.isAllowRecording() && c.hasRight(Right.moderator);
+	}
+
+	public boolean recordingAllowed(Client c) {
+		if (!kHandler.isConnected()) {
+			return false;
+		}
+		Room r = c.getRoom();
+		return hasRightsToRecord(c) && !isRecording(r.getId());
+	}
+
+	public void startRecording(Client c) {
+		if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
+			return;
+		}
+		kHandler.getRoom(c.getRoomId()).startRecording(cm, c, recDao);
+	}
+
+	public void stopRecording(Client c) {
+		if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
+			return;
+		}
+		kHandler.getRoom(c.getRoomId()).stopRecording(this, c);
+	}
+
+	void startConvertion(Recording rec) {
+		IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter;
+		taskExecutor.execute(() -> conv.startConversion(rec));
+	}
+
+	public boolean isRecording(Long roomId) {
+		if (!kHandler.isConnected()) {
+			return false;
+		}
+		return kHandler.getRoom(roomId).isRecording();
+	}
+
+	void remove(Client c) {
+		for (StreamDesc sd : c.getStreams()) {
+			AbstractStream s = getByUid(sd.getUid());
+			if (s != null) {
+				s.release(this);
+			}
+		}
+		if (c.getRoomId() != null) {
+			KRoom room = kHandler.getRoom(c.getRoomId());
+			room.leave(this, c);
+			checkStreams(c.getRoomId());
+		}
+	}
+
+	void addStream(KStream stream) {
+		streamByUid.put(stream.getUid(), stream);
+	}
+
+	Client getBySid(String sid) {
+		return cm.getBySid(sid);
+	}
+
+	KStream getByUid(String uid) {
+		return uid == null ? null : streamByUid.get(uid);
+	}
+
+	KurentoHandler getHandler() {
+		return kHandler;
+	}
+
+	IClientManager getClientManager() {
+		return cm;
+	}
+
+	RecordingDao getRecordingDao() {
+		return recDao;
+	}
+
+	@Override
+	public void release(AbstractStream stream) {
+		final String uid = stream.getUid();
+		Client c = cm.getBySid(stream.getSid());
+		if (c != null) {
+			c.removeStream(uid);
+			cm.update(c);
+		}
+		streamByUid.remove(uid);
+	}
+
+	@Override
+	public void destroy() {
+		streamByUid.clear();
+	}
+}
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
new file mode 100644
index 0000000..38a3b20
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
@@ -0,0 +1,126 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST;
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_KUID;
+import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.entity.basic.IWsClient;
+import org.kurento.client.IceCandidate;
+import org.kurento.client.MediaPipeline;
+import org.kurento.client.Transaction;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.github.openjson.JSONObject;
+
+@Component
+public class TestStreamProcessor implements IStreamProcessor {
+	private final Map<String, KTestStream> streamByUid = new ConcurrentHashMap<>();
+
+	@Autowired
+	private KurentoHandler kHandler;
+
+	void onMessage(IWsClient _c, final String cmdId, JSONObject msg) {
+		KTestStream user = getByUid(_c.getUid());
+		switch (cmdId) {
+			case "wannaRecord":
+				WebSocketHelper.sendClient(_c, newTestKurentoMsg()
+						.put("id", "canRecord")
+						.put(PARAM_ICE, kHandler.getTurnServers(true))
+						);
+				break;
+			case "record":
+				if (user != null) {
+					user.release(this);
+				}
+				user = new KTestStream(_c, msg, createTestPipeline());
+				streamByUid.put(_c.getUid(), user);
+				break;
+			case "iceCandidate":
+				JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
+				if (user != null) {
+					IceCandidate cand = new IceCandidate(candidate.getString(PARAM_CANDIDATE),
+							candidate.getString("sdpMid"), candidate.getInt("sdpMLineIndex"));
+					user.addCandidate(cand);
+				}
+				break;
+			case "wannaPlay":
+				WebSocketHelper.sendClient(_c, newTestKurentoMsg()
+						.put("id", "canPlay")
+						.put(PARAM_ICE, kHandler.getTurnServers(true))
+						);
+				break;
+			case "play":
+				if (user != null) {
+					user.play(_c, msg, createTestPipeline());
+				}
+				break;
+		}
+	}
+
+	private KTestStream getByUid(String uid) {
+		return uid == null ? null : streamByUid.get(uid);
+	}
+
+	private MediaPipeline createTestPipeline() {
+		Transaction t = kHandler.beginTransaction();
+		MediaPipeline pipe = kHandler.getClient().createMediaPipeline(t);
+		pipe.addTag(t, TAG_KUID, kHandler.getKuid());
+		pipe.addTag(t, TAG_MODE, MODE_TEST);
+		pipe.addTag(t, TAG_ROOM, MODE_TEST);
+		t.commit();
+		return pipe;
+	}
+
+	static JSONObject newTestKurentoMsg() {
+		return KurentoHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST);
+	}
+
+	void remove(IWsClient _c) {
+		AbstractStream s = getByUid(_c.getUid());
+		if (s != null) {
+			s.release(this);
+		}
+	}
+
+	@Override
+	public void release(AbstractStream stream) {
+		streamByUid.remove(stream.getUid());
+	}
+
+	@Override
+	public void destroy() {
+		for (Entry<String, KTestStream> e : streamByUid.entrySet()) {
+			e.getValue().release(this);
+			streamByUid.remove(e.getKey());
+		}
+		streamByUid.clear();
+	}
+}
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java
index bde9fa7..84487ff 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/BaseMockedTest.java
@@ -34,6 +34,7 @@ import org.kurento.client.internal.TransactionImpl;
 import org.kurento.client.internal.client.RomManager;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -41,7 +42,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import com.github.openjson.JSONObject;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({KurentoClient.class, WebSocketHelper.class, IKStream.class})
+@PrepareForTest({KurentoClient.class, WebSocketHelper.class, AbstractStream.class})
 public class BaseMockedTest {
 	@Mock
 	protected RomManager romManager;
@@ -51,11 +52,19 @@ public class BaseMockedTest {
 	protected KurentoClient client;
 	@Spy
 	@InjectMocks
+	protected StreamProcessor streamProcessor;
+	@Spy
+	@InjectMocks
+	protected TestStreamProcessor testProcessor;
+	@Spy
+	@InjectMocks
 	protected KurentoHandler handler;
+
 	protected final static JSONObject MSG_BASE = new JSONObject();
 
 	@Before
 	public void setup() {
+		MockitoAnnotations.initMocks(this);
 		mockStatic(KurentoClient.class);
 		mockStatic(WebSocketHelper.class);
 		when(client.getServerManager()).thenReturn(kServerManager);
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
index 1877518..dd67727 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestNotConnectedMocked.java
@@ -34,22 +34,22 @@ public class TestNotConnectedMocked extends BaseMockedTest {
 
 	@Test
 	public void testRecordingAllowed() {
-		assertFalse(handler.recordingAllowed(null));
+		assertFalse(streamProcessor.recordingAllowed(null));
 	}
 
 	@Test
 	public void testStartRecording() {
-		handler.startRecording(null);
+		streamProcessor.startRecording(null);
 	}
 
 	@Test
 	public void testStopRecording() {
-		handler.stopRecording(null);
+		streamProcessor.stopRecording(null);
 	}
 
 	@Test
 	public void testIsRecording() {
-		assertFalse(handler.isRecording(null));
+		assertFalse(streamProcessor.isRecording(null));
 	}
 
 	@Test
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java
index fa546ee..c589bf4 100644
--- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java
@@ -93,15 +93,15 @@ public class TestRoomFlowMocked extends BaseMockedTest {
 	@Test
 	public void testRecordingAllowed() {
 		Client c = getClient();
-		assertFalse(handler.recordingAllowed(c));
+		assertFalse(streamProcessor.recordingAllowed(c));
 		c.setRoom(new Room());
-		assertFalse(handler.recordingAllowed(c));
+		assertFalse(streamProcessor.recordingAllowed(c));
 		c.getRoom().setId(ROOM_ID);
 		c.getRoom().setAllowRecording(true);
-		assertFalse(handler.recordingAllowed(c));
+		assertFalse(streamProcessor.recordingAllowed(c));
 		c.allow(Room.Right.moderator);
 		when(roomDao.get(ROOM_ID)).thenReturn(c.getRoom());
-		assertTrue(handler.recordingAllowed(c));
+		assertTrue(streamProcessor.recordingAllowed(c));
 	}
 
 	private Client getClientWithRoom() {
@@ -145,7 +145,7 @@ public class TestRoomFlowMocked extends BaseMockedTest {
 		Client c = getClientFull();
 		when(roomDao.get(ROOM_ID)).thenReturn(c.getRoom());
 		handler.onMessage(c, msg);
-		assertTrue(handler.isSharing(ROOM_ID));
+		assertTrue(streamProcessor.isSharing(ROOM_ID));
 		handler.onMessage(c, msg);
 	}
 }
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 3febdfb..211d2db 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.dao.calendar.AppointmentDao;
 import org.apache.openmeetings.db.dao.log.ConferenceLogDao;
@@ -179,7 +180,7 @@ public class RoomPanel extends BasePanel {
 					sb.append("VideoManager.play(").append(streams).append(", ").append(kHandler.getTurnServers()).append(");");
 				}
 			}
-			if (interview && !kHandler.isRecording(r.getId()) && streams.length() > 0 && _c.hasRight(Right.moderator)) {
+			if (interview && !streamProcessor.isRecording(r.getId()) && streams.length() > 0 && _c.hasRight(Right.moderator)) {
 				sb.append("WbArea.setRecEnabled(true);");
 			}
 			if (!Strings.isEmpty(sb)) {
@@ -241,6 +242,8 @@ public class RoomPanel extends BasePanel {
 	private QuickPollManager qpollManager;
 	@SpringBean
 	private KurentoHandler kHandler;
+	@SpringBean
+	private StreamProcessor streamProcessor;
 
 	public RoomPanel(String id, Room r) {
 		super(id);
@@ -551,7 +554,7 @@ public class RoomPanel extends BasePanel {
 	private void updateInterviewRecordingButtons(IPartialPageRequestHandler handler) {
 		Client _c = getClient();
 		if (interview && _c.hasRight(Right.moderator)) {
-			if (kHandler.isRecording(r.getId())) {
+			if (streamProcessor.isRecording(r.getId())) {
 				handler.appendJavaScript("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(true);}");
 			} else {
 				boolean hasStreams = false;
@@ -746,7 +749,7 @@ public class RoomPanel extends BasePanel {
 	}
 
 	public boolean screenShareAllowed() {
-		return kHandler.screenShareAllowed(getClient());
+		return streamProcessor.screenShareAllowed(getClient());
 	}
 
 	public RoomSidebar getSidebar() {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java
index a154a00..0f98545 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/menu/RoomMenuPanel.java
@@ -31,6 +31,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.dao.basic.ConfigurationDao;
 import org.apache.openmeetings.db.entity.basic.Client;
@@ -103,6 +104,8 @@ public class RoomMenuPanel extends Panel {
 	private ConfigurationDao cfgDao;
 	@SpringBean
 	private KurentoHandler kHandler;
+	@SpringBean
+	private StreamProcessor streamProcessor;
 
 	public RoomMenuPanel(String id, final RoomPanel room) {
 		super(id);
@@ -208,7 +211,7 @@ public class RoomMenuPanel extends Panel {
 		menuPanel.update(handler);
 		StringBuilder roomClass = new StringBuilder("room name");
 		StringBuilder roomTitle = new StringBuilder();
-		if (kHandler.isRecording(r.getId())) {
+		if (streamProcessor.isRecording(r.getId())) {
 			JSONObject ru = kHandler.getRecordingUser(r.getId());
 			if (!Strings.isEmpty(ru.optString("login"))) {
 				roomTitle.append(String.format("%s %s %s %s %s", getString("419")
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js
index aec324e..7525049 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js
@@ -38,15 +38,10 @@ var Sharer = (function() {
 					, height: height.val()
 				});
 			} else {
-				const cuid = Room.getOptions().uid
-					, v = $('div[data-client-uid="' + cuid + '"][data-client-type="SCREEN"]')
-					, uid = v.data().stream().uid;
 				VideoManager.sendMessage({
-					id: 'stopSharing'
-					, uid: uid
+					id: 'pauseSharing'
+					, uid: _getShareUid()
 				});
-				VideoManager.close(uid, false);
-				_setShareState(SHARE_STOPED);
 			}
 		});
 		width = sharer.find('.width');
@@ -64,15 +59,10 @@ var Sharer = (function() {
 					, height: height.val()
 				});
 			} else {
-				const cuid = Room.getOptions().uid
-					, v = $('div[data-client-uid="' + cuid + '"][data-client-type="SCREEN"]')
-					, uid = v.data().stream().uid;
 				VideoManager.sendMessage({
 					id: 'stopRecord'
-					, uid: uid
+					, uid: _getShareUid()
 				});
-				VideoManager.close(uid, false);
-				_setRecState(SHARE_STOPED);
 			}
 		});
 	}
@@ -177,6 +167,10 @@ var Sharer = (function() {
 		}
 		return cnts;
 	}
+	function _getShareUid() {
+		const v = $('div[data-client-uid="' + Room.getOptions().uid + '"][data-client-type="SCREEN"]');
+		return v && v.data() && v.data().stream() ? v.data().stream().uid : '';
+	}
 
 	self.init = _init;
 	self.open = function() {
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
index 40760df..1e5294d 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js
@@ -25,7 +25,11 @@ var VideoManager = (function() {
 			, uid = sd.uid
 			, w = $('#' + VideoUtil.getVid(uid))
 			, v = w.data();
-		v.stream().activities = sd.activities;
+		if (!VideoUtil.isSharing(sd) && !VideoUtil.isRecording(sd)) {
+			VideoManager.close(uid, false);
+		} else {
+			v.stream().activities = sd.activities;
+		}
 		Sharer.setShareState(VideoUtil.isSharing(sd) ? SHARE_STARTED : SHARE_STOPED);
 		Sharer.setRecState(VideoUtil.isRecording(sd) ? SHARE_STARTED : SHARE_STOPED);
 	}
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java
index 5be3cda..14499b3 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/sidebar/RoomSidebar.java
@@ -24,7 +24,7 @@ import static org.apache.wicket.ajax.attributes.CallbackParameter.explicit;
 
 import java.util.ArrayList;
 
-import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.room.Room;
@@ -193,7 +193,7 @@ public class RoomSidebar extends Panel {
 				if (!avInited) {
 					avInited = true;
 					if (Room.Type.conference == room.getRoom().getType()) {
-						kurento.toggleActivity(c, Client.Activity.AUDIO_VIDEO);
+						streamProcessor.toggleActivity(c, Client.Activity.AUDIO_VIDEO);
 					}
 				}
 				cm.update(c);
@@ -206,7 +206,7 @@ public class RoomSidebar extends Panel {
 	@SpringBean
 	private ClientManager cm;
 	@SpringBean
-	private KurentoHandler kurento;
+	private StreamProcessor streamProcessor;
 
 	public RoomSidebar(String id, final RoomPanel room) {
 		super(id);
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java
index 63c77f6..c047835 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/wb/InterviewWbPanel.java
@@ -20,7 +20,7 @@ package org.apache.openmeetings.web.room.wb;
 
 import java.io.IOException;
 
-import org.apache.openmeetings.core.remote.KurentoHandler;
+import org.apache.openmeetings.core.remote.StreamProcessor;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.file.BaseFileItem;
 import org.apache.openmeetings.db.entity.room.Room.Right;
@@ -36,7 +36,7 @@ public class InterviewWbPanel extends AbstractWbPanel {
 	private static final long serialVersionUID = 1L;
 	public static final ResourceReference INTERVIEWWB_JS_REFERENCE = new JavaScriptResourceReference(WbPanel.class, "interviewwb.js");
 	@SpringBean
-	private KurentoHandler kurento;
+	private StreamProcessor streamProcessor;
 
 	public InterviewWbPanel(String id, RoomPanel rp) {
 		super(id, rp);
@@ -55,13 +55,13 @@ public class InterviewWbPanel extends AbstractWbPanel {
 	@Override
 	protected void processWbAction(WbAction a, JSONObject obj, AjaxRequestTarget target) throws IOException {
 		Client c = rp.getClient();
-		if (kurento.recordingAllowed(c)) {
+		if (streamProcessor.recordingAllowed(c)) {
 			switch (a) {
 				case startRecording:
-					kurento.startRecording(c);
+					streamProcessor.startRecording(c);
 					break;
 				case stopRecording:
-					kurento.stopRecording(c);
+					streamProcessor.stopRecording(c);
 					break;
 				default:
 					//no-op
diff --git a/openmeetings-web/src/test/resources/keystore b/openmeetings-web/src/test/resources/keystore
index 8cc07dc..c2a5309 100644
Binary files a/openmeetings-web/src/test/resources/keystore and b/openmeetings-web/src/test/resources/keystore differ