You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by se...@apache.org on 2021/02/17 07:28:58 UTC

[openmeetings] branch feature/OPENMEETINGS-2577-performance--metrics-disabled-state updated: OPENMEETINGS-2577 Update and externalise Method calls for being able to have Spring style annotations on method calls.

This is an automated email from the ASF dual-hosted git repository.

sebawagner pushed a commit to branch feature/OPENMEETINGS-2577-performance--metrics-disabled-state
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/feature/OPENMEETINGS-2577-performance--metrics-disabled-state by this push:
     new 4d5fb81  OPENMEETINGS-2577 Update and externalise Method calls for being able to have Spring style annotations on method calls.
4d5fb81 is described below

commit 4d5fb810615ed01ed91152bb7954d9bd2e34afc6
Author: Sebastian Wagner <se...@gmail.com>
AuthorDate: Wed Feb 17 20:28:34 2021 +1300

    OPENMEETINGS-2577 Update and externalise Method calls for being able to have Spring style annotations on method calls.
---
 .../openmeetings/core/remote/StreamProcessor.java  |  92 ++-------------
 .../core/remote/StreamProcessorActions.java        | 127 +++++++++++++++++++++
 2 files changed, 134 insertions(+), 85 deletions(-)

diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 0f7750d..3771c0c 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -19,11 +19,9 @@
  */
 package org.apache.openmeetings.core.remote;
 
-import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed;
 import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
-import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
 import static org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled;
 
 import java.util.Collection;
@@ -52,9 +50,6 @@ import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
 import org.apache.openmeetings.util.logging.TimedApplication;
-import org.apache.wicket.util.string.Strings;
-import org.kurento.client.IceCandidate;
-import org.kurento.client.internal.server.KurentoServerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -84,6 +79,8 @@ public class StreamProcessor implements IStreamProcessor {
 	private RecordingConverter recordingConverter;
 	@Autowired
 	private InterviewConverter interviewConverter;
+	@Autowired
+	private StreamProcessorActions streamProcessorActions;
 
 	@TimedApplication
 	void onMessage(Client c, final String cmdId, JSONObject msg) {
@@ -109,16 +106,16 @@ public class StreamProcessor implements IStreamProcessor {
 				toggleActivity(c, Activity.valueOf(msg.getString("activity")));
 				break;
 			case "broadcastStarted":
-				handleBroadcastStarted(c, uid, msg);
+				streamProcessorActions.handleBroadcastStarted(c, uid, msg);
 				break;
 			case "broadcastRestarted":
-				handleBroadcastRestarted(c, uid);
+				streamProcessorActions.handleBroadcastRestarted(c, uid);
 				break;
 			case "onIceCandidate":
-				addIceCandidate(msg);
+				streamProcessorActions.addIceCandidate(msg);
 				break;
 			case "addListener":
-				addListener(c, msg);
+				streamProcessorActions.addListener(c, msg);
 				break;
 			case "wannaShare":
 				osd = c.getScreenStream();
@@ -156,80 +153,6 @@ public class StreamProcessor implements IStreamProcessor {
 		}
 	}
 
-	@TimedApplication
-	private void addIceCandidate(JSONObject msg) {
-		final String uid = msg.optString("uid");
-		KStream sender;
-		sender = getByUid(uid);
-		if (sender != null) {
-			JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
-			String candStr = candidate.getString(PARAM_CANDIDATE);
-			if (!Strings.isEmpty(candStr)) {
-				IceCandidate cand = new IceCandidate(
-						candStr
-						, candidate.getString("sdpMid")
-						, candidate.getInt("sdpMLineIndex"));
-				sender.addCandidate(cand, msg.getString("luid"));
-			}
-		}
-	}
-
-	@TimedApplication
-	private void addListener(Client c, JSONObject msg) {
-		KStream sender = getByUid(msg.getString("sender"));
-		if (sender != null) {
-			Client sendClient = cm.getBySid(sender.getSid());
-			StreamDesc sd = sendClient.getStream(sender.getUid());
-			if (sd == null) {
-				return;
-			}
-			if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) {
-				return;
-			}
-			sender.addListener(c.getSid(), c.getUid(), msg.getString("sdpOffer"));
-		}
-	}
-
-	@TimedApplication
-	private void handleBroadcastRestarted(Client c, final String uid) {
-		if (!kHandler.isConnected()) {
-			return;
-		}
-		KStream sender = getByUid(uid);
-		if (sender != null) {
-			sender.broadcastRestarted();
-		}
-	}
-
-	@TimedApplication
-	private void handleBroadcastStarted(Client c, final String uid, JSONObject msg) {
-		if (!kHandler.isConnected()) {
-			return;
-		}
-		StreamDesc sd = c.getStream(uid);
-		KStream sender = getByUid(uid);
-		try {
-			if (sender == null) {
-				KRoom room = kHandler.getRoom(c.getRoomId());
-				sender = room.join(sd, kHandler);
-			}
-			if (msg.has("width")) {
-				sd.setWidth(msg.getInt("width")).setHeight(msg.getInt("height"));
-				cm.update(c);
-			}
-			startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> {
-				if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
-					startRecording(c);
-				}
-			});
-		} catch (KurentoServerException e) {
-			sender.release();
-			WebSocketHelper.sendClient(c, newStoppedMsg(sd));
-			sendError(c, "Failed to start broadcast: " + e.getMessage());
-			log.error("Failed to start broadcast", e);
-		}
-	}
-
 	/**
 	 *  Method to start broadcasting.  Externalised for mocking purpose to be able to
 	 *  prevent calling webRTC methods.
@@ -240,7 +163,6 @@ public class StreamProcessor implements IStreamProcessor {
 	 * @param then steps need to be done after broadcast is started
 	 * @return the current KStream
 	 */
-	@TimedApplication
 	void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) {
 		stream.startBroadcast(sd, sdpOffer, then);
 	}
@@ -606,7 +528,7 @@ public class StreamProcessor implements IStreamProcessor {
 		}
 	}
 
-	private static JSONObject newStoppedMsg(StreamDesc sd) {
+	protected static JSONObject newStoppedMsg(StreamDesc sd) {
 		return newKurentoMsg()
 				.put("id", "broadcastStopped")
 				.put("uid", sd.getUid());
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
new file mode 100644
index 0000000..d6de328
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
@@ -0,0 +1,127 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
+
+import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.basic.Client.StreamType;
+import org.apache.openmeetings.db.manager.IClientManager;
+import org.apache.openmeetings.util.logging.TimedApplication;
+import org.apache.wicket.util.string.Strings;
+import org.kurento.client.IceCandidate;
+import org.kurento.client.internal.server.KurentoServerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.github.openjson.JSONObject;
+
+@Component
+public class StreamProcessorActions {
+
+	private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
+
+	@Autowired
+	private IClientManager cm;
+	@Autowired
+	private KurentoHandler kHandler;
+	@Autowired
+	private StreamProcessor streamProcessor;
+
+	@TimedApplication
+	protected void addIceCandidate(JSONObject msg) {
+		final String uid = msg.optString("uid");
+		KStream sender;
+		sender = streamProcessor.getByUid(uid);
+		if (sender != null) {
+			JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
+			String candStr = candidate.getString(PARAM_CANDIDATE);
+			if (!Strings.isEmpty(candStr)) {
+				IceCandidate cand = new IceCandidate(
+						candStr
+						, candidate.getString("sdpMid")
+						, candidate.getInt("sdpMLineIndex"));
+				sender.addCandidate(cand, msg.getString("luid"));
+			}
+		}
+	}
+
+	@TimedApplication
+	protected void addListener(Client c, JSONObject msg) {
+		KStream sender = streamProcessor.getByUid(msg.getString("sender"));
+		if (sender != null) {
+			Client sendClient = cm.getBySid(sender.getSid());
+			StreamDesc sd = sendClient.getStream(sender.getUid());
+			if (sd == null) {
+				return;
+			}
+			if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) {
+				return;
+			}
+			sender.addListener(c.getSid(), c.getUid(), msg.getString("sdpOffer"));
+		}
+	}
+
+	@TimedApplication
+	protected void handleBroadcastRestarted(Client c, final String uid) {
+		if (!kHandler.isConnected()) {
+			return;
+		}
+		KStream sender = streamProcessor.getByUid(uid);
+		if (sender != null) {
+			sender.broadcastRestarted();
+		}
+	}
+
+	@TimedApplication
+	protected void handleBroadcastStarted(Client c, final String uid, JSONObject msg) {
+		if (!kHandler.isConnected()) {
+			return;
+		}
+		StreamDesc sd = c.getStream(uid);
+		KStream sender = streamProcessor.getByUid(uid);
+		try {
+			if (sender == null) {
+				KRoom room = kHandler.getRoom(c.getRoomId());
+				sender = room.join(sd, kHandler);
+			}
+			if (msg.has("width")) {
+				sd.setWidth(msg.getInt("width")).setHeight(msg.getInt("height"));
+				cm.update(c);
+			}
+			streamProcessor.startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> {
+				if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !streamProcessor.isRecording(c.getRoomId())) {
+					streamProcessor.startRecording(c);
+				}
+			});
+		} catch (KurentoServerException e) {
+			sender.release();
+			WebSocketHelper.sendClient(c, StreamProcessor.newStoppedMsg(sd));
+			sendError(c, "Failed to start broadcast: " + e.getMessage());
+			log.error("Failed to start broadcast", e);
+		}
+	}
+}