You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by se...@apache.org on 2021/02/17 07:28:58 UTC
[openmeetings] branch
feature/OPENMEETINGS-2577-performance--metrics-disabled-state updated:
OPENMEETINGS-2577 Update and externalise Method calls for being able to
have Spring style annotations on method calls.
This is an automated email from the ASF dual-hosted git repository.
sebawagner pushed a commit to branch feature/OPENMEETINGS-2577-performance--metrics-disabled-state
in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/feature/OPENMEETINGS-2577-performance--metrics-disabled-state by this push:
new 4d5fb81 OPENMEETINGS-2577 Update and externalise Method calls for being able to have Spring style annotations on method calls.
4d5fb81 is described below
commit 4d5fb810615ed01ed91152bb7954d9bd2e34afc6
Author: Sebastian Wagner <se...@gmail.com>
AuthorDate: Wed Feb 17 20:28:34 2021 +1300
OPENMEETINGS-2577 Update and externalise Method calls for being able to have Spring style annotations on method calls.
---
.../openmeetings/core/remote/StreamProcessor.java | 92 ++-------------
.../core/remote/StreamProcessorActions.java | 127 +++++++++++++++++++++
2 files changed, 134 insertions(+), 85 deletions(-)
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 0f7750d..3771c0c 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -19,11 +19,9 @@
*/
package org.apache.openmeetings.core.remote;
-import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed;
import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
-import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
import static org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled;
import java.util.Collection;
@@ -52,9 +50,6 @@ import org.apache.openmeetings.db.manager.IClientManager;
import org.apache.openmeetings.db.util.ws.RoomMessage;
import org.apache.openmeetings.db.util.ws.TextRoomMessage;
import org.apache.openmeetings.util.logging.TimedApplication;
-import org.apache.wicket.util.string.Strings;
-import org.kurento.client.IceCandidate;
-import org.kurento.client.internal.server.KurentoServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -84,6 +79,8 @@ public class StreamProcessor implements IStreamProcessor {
private RecordingConverter recordingConverter;
@Autowired
private InterviewConverter interviewConverter;
+ @Autowired
+ private StreamProcessorActions streamProcessorActions;
@TimedApplication
void onMessage(Client c, final String cmdId, JSONObject msg) {
@@ -109,16 +106,16 @@ public class StreamProcessor implements IStreamProcessor {
toggleActivity(c, Activity.valueOf(msg.getString("activity")));
break;
case "broadcastStarted":
- handleBroadcastStarted(c, uid, msg);
+ streamProcessorActions.handleBroadcastStarted(c, uid, msg);
break;
case "broadcastRestarted":
- handleBroadcastRestarted(c, uid);
+ streamProcessorActions.handleBroadcastRestarted(c, uid);
break;
case "onIceCandidate":
- addIceCandidate(msg);
+ streamProcessorActions.addIceCandidate(msg);
break;
case "addListener":
- addListener(c, msg);
+ streamProcessorActions.addListener(c, msg);
break;
case "wannaShare":
osd = c.getScreenStream();
@@ -156,80 +153,6 @@ public class StreamProcessor implements IStreamProcessor {
}
}
- @TimedApplication
- private void addIceCandidate(JSONObject msg) {
- final String uid = msg.optString("uid");
- KStream sender;
- sender = getByUid(uid);
- if (sender != null) {
- JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
- String candStr = candidate.getString(PARAM_CANDIDATE);
- if (!Strings.isEmpty(candStr)) {
- IceCandidate cand = new IceCandidate(
- candStr
- , candidate.getString("sdpMid")
- , candidate.getInt("sdpMLineIndex"));
- sender.addCandidate(cand, msg.getString("luid"));
- }
- }
- }
-
- @TimedApplication
- private void addListener(Client c, JSONObject msg) {
- KStream sender = getByUid(msg.getString("sender"));
- if (sender != null) {
- Client sendClient = cm.getBySid(sender.getSid());
- StreamDesc sd = sendClient.getStream(sender.getUid());
- if (sd == null) {
- return;
- }
- if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) {
- return;
- }
- sender.addListener(c.getSid(), c.getUid(), msg.getString("sdpOffer"));
- }
- }
-
- @TimedApplication
- private void handleBroadcastRestarted(Client c, final String uid) {
- if (!kHandler.isConnected()) {
- return;
- }
- KStream sender = getByUid(uid);
- if (sender != null) {
- sender.broadcastRestarted();
- }
- }
-
- @TimedApplication
- private void handleBroadcastStarted(Client c, final String uid, JSONObject msg) {
- if (!kHandler.isConnected()) {
- return;
- }
- StreamDesc sd = c.getStream(uid);
- KStream sender = getByUid(uid);
- try {
- if (sender == null) {
- KRoom room = kHandler.getRoom(c.getRoomId());
- sender = room.join(sd, kHandler);
- }
- if (msg.has("width")) {
- sd.setWidth(msg.getInt("width")).setHeight(msg.getInt("height"));
- cm.update(c);
- }
- startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> {
- if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
- startRecording(c);
- }
- });
- } catch (KurentoServerException e) {
- sender.release();
- WebSocketHelper.sendClient(c, newStoppedMsg(sd));
- sendError(c, "Failed to start broadcast: " + e.getMessage());
- log.error("Failed to start broadcast", e);
- }
- }
-
/**
* Method to start broadcasting. Externalised for mocking purpose to be able to
* prevent calling webRTC methods.
@@ -240,7 +163,6 @@ public class StreamProcessor implements IStreamProcessor {
* @param then steps need to be done after broadcast is started
* @return the current KStream
*/
- @TimedApplication
void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) {
stream.startBroadcast(sd, sdpOffer, then);
}
@@ -606,7 +528,7 @@ public class StreamProcessor implements IStreamProcessor {
}
}
- private static JSONObject newStoppedMsg(StreamDesc sd) {
+ protected static JSONObject newStoppedMsg(StreamDesc sd) {
return newKurentoMsg()
.put("id", "broadcastStopped")
.put("uid", sd.getUid());
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
new file mode 100644
index 0000000..d6de328
--- /dev/null
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
@@ -0,0 +1,127 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
+import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
+
+import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.basic.Client.StreamType;
+import org.apache.openmeetings.db.manager.IClientManager;
+import org.apache.openmeetings.util.logging.TimedApplication;
+import org.apache.wicket.util.string.Strings;
+import org.kurento.client.IceCandidate;
+import org.kurento.client.internal.server.KurentoServerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.github.openjson.JSONObject;
+
+@Component
+public class StreamProcessorActions {
+
+ private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
+
+ @Autowired
+ private IClientManager cm;
+ @Autowired
+ private KurentoHandler kHandler;
+ @Autowired
+ private StreamProcessor streamProcessor;
+
+ @TimedApplication
+ protected void addIceCandidate(JSONObject msg) {
+ final String uid = msg.optString("uid");
+ KStream sender;
+ sender = streamProcessor.getByUid(uid);
+ if (sender != null) {
+ JSONObject candidate = msg.getJSONObject(PARAM_CANDIDATE);
+ String candStr = candidate.getString(PARAM_CANDIDATE);
+ if (!Strings.isEmpty(candStr)) {
+ IceCandidate cand = new IceCandidate(
+ candStr
+ , candidate.getString("sdpMid")
+ , candidate.getInt("sdpMLineIndex"));
+ sender.addCandidate(cand, msg.getString("luid"));
+ }
+ }
+ }
+
+ @TimedApplication
+ protected void addListener(Client c, JSONObject msg) {
+ KStream sender = streamProcessor.getByUid(msg.getString("sender"));
+ if (sender != null) {
+ Client sendClient = cm.getBySid(sender.getSid());
+ StreamDesc sd = sendClient.getStream(sender.getUid());
+ if (sd == null) {
+ return;
+ }
+ if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) {
+ return;
+ }
+ sender.addListener(c.getSid(), c.getUid(), msg.getString("sdpOffer"));
+ }
+ }
+
+ @TimedApplication
+ protected void handleBroadcastRestarted(Client c, final String uid) {
+ if (!kHandler.isConnected()) {
+ return;
+ }
+ KStream sender = streamProcessor.getByUid(uid);
+ if (sender != null) {
+ sender.broadcastRestarted();
+ }
+ }
+
+ @TimedApplication
+ protected void handleBroadcastStarted(Client c, final String uid, JSONObject msg) {
+ if (!kHandler.isConnected()) {
+ return;
+ }
+ StreamDesc sd = c.getStream(uid);
+ KStream sender = streamProcessor.getByUid(uid);
+ try {
+ if (sender == null) {
+ KRoom room = kHandler.getRoom(c.getRoomId());
+ sender = room.join(sd, kHandler);
+ }
+ if (msg.has("width")) {
+ sd.setWidth(msg.getInt("width")).setHeight(msg.getInt("height"));
+ cm.update(c);
+ }
+ streamProcessor.startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> {
+ if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !streamProcessor.isRecording(c.getRoomId())) {
+ streamProcessor.startRecording(c);
+ }
+ });
+ } catch (KurentoServerException e) {
+ sender.release();
+ WebSocketHelper.sendClient(c, StreamProcessor.newStoppedMsg(sd));
+ sendError(c, "Failed to start broadcast: " + e.getMessage());
+ log.error("Failed to start broadcast", e);
+ }
+ }
+}