You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openmeetings.apache.org by se...@apache.org on 2020/04/25 20:34:51 UTC
[openmeetings] 01/02: OPENMEETINGS-2298 Add KStream to admin panel.
Add methods to create relevant lists
This is an automated email from the ASF dual-hosted git repository.
sebawagner pushed a commit to branch feature/OPENMEETINGS-2298-add-kstream-to-connections-panel
in repository https://gitbox.apache.org/repos/asf/openmeetings.git
commit 7d29a2888958c0ee0f81f4fbb37942f0d3576bc2
Author: Sebastian Wagner <se...@apache.org>
AuthorDate: Sat Apr 25 14:17:53 2020 +1200
OPENMEETINGS-2298 Add KStream to admin panel. Add methods to create relevant lists
---
.../apache/openmeetings/core/remote/KStream.java | 35 +++++
.../openmeetings/core/remote/KurentoHandler.java | 43 ++++++
.../openmeetings/core/remote/StreamProcessor.java | 5 +
.../core/remote/KurentoHandlerTest.java | 83 +++++++++++
.../web/admin/connection/ConnectionsPanel.java | 155 ++++++++++++++-------
.../admin/connection/dto/ConnectionListItem.java | 70 ++++++++++
.../connection/dto/ConnectionListKStreamItem.java | 97 +++++++++++++
7 files changed, 440 insertions(+), 48 deletions(-)
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index e0e6193..0aaf416 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -31,6 +31,7 @@ import static org.apache.openmeetings.util.OmFileHelper.getRecUri;
import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk;
import java.util.Map.Entry;
+import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,6 +62,7 @@ public class KStream extends AbstractStream {
private static final Logger log = LoggerFactory.getLogger(KStream.class);
private final KRoom room;
+ private final Date connectedSince;
private final StreamType streamType;
private MediaProfileSpecType profile;
private RecorderEndpoint recorder;
@@ -74,6 +76,7 @@ public class KStream extends AbstractStream {
super(sd.getSid(), sd.getUid());
this.room = room;
streamType = sd.getType();
+ this.connectedSince = new Date();
//TODO Min/MaxVideoSendBandwidth
//TODO Min/Max Audio/Video RecvBandwidth
}
@@ -355,6 +358,38 @@ public class KStream extends AbstractStream {
public String getUid() {
return uid;
}
+
+ public Date getConnectedSince() {
+ return connectedSince;
+ }
+
+ public KRoom getRoom() {
+ return room;
+ }
+
+ public StreamType getStreamType() {
+ return streamType;
+ }
+
+ public MediaProfileSpecType getProfile() {
+ return profile;
+ }
+
+ public RecorderEndpoint getRecorder() {
+ return recorder;
+ }
+
+ public WebRtcEndpoint getOutgoingMedia() {
+ return outgoingMedia;
+ }
+
+ public Long getChunkId() {
+ return chunkId;
+ }
+
+ public Type getType() {
+ return type;
+ }
public boolean contains(String uid) {
return this.uid.equals(uid) || listeners.containsKey(uid);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index c7abd4c..f2b6e8c 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -24,7 +24,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -236,6 +238,47 @@ public class KurentoHandler {
log.debug("Room {} found!", roomId);
return room;
}
+
+ public Collection<KRoom> getRooms() {
+ return rooms.values();
+ }
+
+ /**
+ * This needs to combine two lists as we currently hold a reference to the KStream in two places:
+ * <ul>
+ * <li>{@link StreamProcessor#getStreams()}}</li>
+ * <li>{@link KRoom#getParticipants()}</li>
+ * </ul>
+ * Both are singletons and hold a reference to a stream list and can get out of sync or leak.
+ *
+ * TODO: Investigate if we can have 1 source of truth.
+ *
+ * @return list of KStreams registered
+ */
+ public Collection<KStream> getAllStreams() {
+ Collection<KStream> allStreams = new ArrayList<>();
+
+ allStreams.addAll(streamProcessor.getStreams());
+
+ log.info("Retrieve all Streams, StreamProcessor has {} of streams", allStreams.size());
+
+ // Add any streams from the KRoom that are not in the StreamProcessor
+ getRooms().forEach(
+ room -> {
+ log.info("Retrieve room {}, participants {}", room, room.getParticipants().size());
+ room.getParticipants().forEach(
+ participant -> {
+ if (!allStreams.contains(participant)) {
+ log.warn("Stream was in KRoom but not in StreamProcessor, stream {}", participant);
+ allStreams.add(participant);
+ }
+ }
+ );
+ }
+ );
+
+ return allStreams;
+ }
static JSONObject newKurentoMsg() {
return new JSONObject().put("type", KURENTO_TYPE);
diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 1032a8a..902fc55 100644
--- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -25,6 +25,7 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed
import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -465,6 +466,10 @@ public class StreamProcessor implements IStreamProcessor {
void addStream(KStream stream) {
streamByUid.put(stream.getUid(), stream);
}
+
+ public Collection<KStream> getStreams() {
+ return streamByUid.values();
+ }
Client getBySid(String sid) {
return cm.getBySid(sid);
diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/KurentoHandlerTest.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/KurentoHandlerTest.java
new file mode 100644
index 0000000..e55a2d5
--- /dev/null
+++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/KurentoHandlerTest.java
@@ -0,0 +1,83 @@
+/*
+ * (C) Copyright 2014 Kurento (http://kurento.org/)
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.basic.Client.StreamType;
+import org.apache.openmeetings.db.entity.room.Room;
+import org.apache.openmeetings.db.entity.room.Room.Type;
+import org.apache.openmeetings.db.entity.user.User;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KurentoHandlerTest {
+
+ @Mock
+ private StreamProcessor streamProcessor;
+
+ @InjectMocks
+ @Spy
+ private KurentoHandler kurentoHandler;
+
+ @Before
+ public void setup() {
+ Room room1 = new Room();
+ room1.setId(1L);
+ room1.setType(Type.CONFERENCE);
+ KRoom kroom1 = new KRoom(room1, null, null);
+ User user1 = new User();
+ user1.setLogin("login1");
+ Client cl1 = new Client("sessionId1", 1, user1, "pictureUri1");
+ cl1.setRoom(room1);
+ StreamDesc sd1 = cl1.addStream(StreamType.WEBCAM, Activity.AUDIO_VIDEO);
+ kroom1.join(sd1);
+
+ Collection<KRoom> rooms = new ArrayList<>();
+ rooms.add(kroom1);
+
+ // slightly different syntax for mocking Spy
+ Mockito.doReturn(rooms).when(kurentoHandler).getRooms();
+
+ }
+
+ @Test
+ public void getStreamsWithOneKStreamInKRoom() {
+ Collection<KStream> resultingStreams = kurentoHandler.getAllStreams();
+ assertEquals(resultingStreams.size(), 1);
+ }
+
+
+}
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
index 3f2682c..836dc8c 100644
--- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
@@ -25,14 +25,19 @@ import static org.apache.openmeetings.web.common.confirmation.ConfirmableAjaxBor
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.openmeetings.core.remote.KStream;
import org.apache.openmeetings.core.remote.KurentoHandler;
import org.apache.openmeetings.db.dao.user.IUserManager;
import org.apache.openmeetings.db.entity.basic.Client;
import org.apache.openmeetings.web.admin.AdminBasePanel;
import org.apache.openmeetings.web.admin.SearchableDataView;
+import org.apache.openmeetings.web.admin.connection.dto.ConnectionListItem;
+import org.apache.openmeetings.web.admin.connection.dto.ConnectionListKStreamItem;
import org.apache.openmeetings.web.app.ClientManager;
import org.apache.openmeetings.web.common.PagedEntityListPanel;
import org.apache.openmeetings.web.data.SearchableDataProvider;
@@ -57,55 +62,104 @@ public class ConnectionsPanel extends AdminBasePanel {
private KurentoHandler scm;
@SpringBean
private IUserManager userManager;
+
+ protected List<ConnectionListItem> getConnections() {
+
+ List<ConnectionListItem> connections = new ArrayList<>();
+ List<Client> clients = cm.list();
+ Collection<KStream> streams = scm.getAllStreams();
+
+ connections.addAll(
+ clients.stream()
+ .map(client -> new ConnectionListItem(client, null))
+ .collect(Collectors.toList())
+ );
+ connections.addAll(
+ streams.stream()
+ .map(stream -> new ConnectionListItem(null,
+ new ConnectionListKStreamItem(
+ stream.getSid(),
+ stream.getUid(),
+ (stream.getRoom() == null) ? null : stream.getRoom().getRoomId(),
+ stream.getConnectedSince(),
+ stream.getStreamType(),
+ stream.getProfile().toString(),
+ (stream.getRecorder() == null) ? null : stream.getRecorder().toString(),
+ stream.getChunkId(),
+ stream.getType()
+ )))
+ .collect(Collectors.toList())
+ );
+ return connections;
+ }
public ConnectionsPanel(String id) {
super(id);
- SearchableDataProvider<Client> sdp = new SearchableDataProvider<>(null) {
+ SearchableDataProvider<ConnectionListItem> sdp = new SearchableDataProvider<>(null) {
private static final long serialVersionUID = 1L;
- private List<Client> list() {
- List<Client> l = new ArrayList<>();
- l.addAll(cm.list());
- return l;
- }
-
@Override
- public Iterator<? extends Client> iterator(long first, long count) {
- List<Client> l = list();
+ public Iterator<? extends ConnectionListItem> iterator(long first, long count) {
+ List<ConnectionListItem> l = getConnections();
return l.subList((int)Math.max(0, first), (int)Math.min(first + count, l.size())).iterator();
}
@Override
public long size() {
- return list().size();
+ return getConnections().size();
}
};
final WebMarkupContainer container = new WebMarkupContainer("container");
final WebMarkupContainer details = new WebMarkupContainer("details");
- SearchableDataView<Client> dataView = new SearchableDataView<>("clientList", sdp) {
+ SearchableDataView<ConnectionListItem> dataView = new SearchableDataView<>("clientList", sdp) {
private static final long serialVersionUID = 1L;
@Override
- protected void populateItem(final Item<Client> item) {
- Client c = item.getModelObject();
- item.add(new Label("type", "html5"));
- item.add(new Label("login", c.getUser().getLogin()));
- item.add(new Label("since", getDateFormat().format(c.getConnectedSince())));
- item.add(new Label("scope", c.getRoom() == null ? "html5" : "" + c.getRoom().getId()));
- item.add(new Label("server", c.getServerId()));
- item.add(new BootstrapAjaxLink<String>("kick", null, Buttons.Type.Outline_Danger, new ResourceModel("603")) {
- private static final long serialVersionUID = 1L;
- {
- setSize(Buttons.Size.Small);
- }
-
- @Override
- public void onClick(AjaxRequestTarget target) {
- cm.invalidate(c.getUserId(), c.getSessionId());
- target.add(container, details.setVisible(false));
- }
- }.add(newOkCancelConfirm(this, getString("605"))));
+ protected void populateItem(final Item<ConnectionListItem> item) {
+ ConnectionListItem connection = item.getModelObject();
+
+ if (connection.getStream() != null) {
+ ConnectionListKStreamItem kStream = connection.getStream();
+ item.add(new Label("type", kStream.getType()));
+ item.add(new Label("login", kStream.getUid()));
+ item.add(new Label("since", getDateFormat().format(kStream.getConnectedSince())));
+ item.add(new Label("scope", kStream.getStreamType()));
+ item.add(new Label("server", ""));
+ item.add(new BootstrapAjaxLink<String>("kick", null, Buttons.Type.Outline_Danger, new ResourceModel("603")) {
+ private static final long serialVersionUID = 1L;
+ {
+ setSize(Buttons.Size.Small);
+ }
+
+ @Override
+ public void onClick(AjaxRequestTarget target) {
+ // TODO come up with method to kick off this KStream
+ target.add(container, details.setVisible(false));
+ }
+ }.add(newOkCancelConfirm(this, getString("605"))));
+ }
+ if (connection.getClient() != null) {
+ Client c = connection.getClient();
+ item.add(new Label("type", "html5"));
+ item.add(new Label("login", c.getUser().getLogin()));
+ item.add(new Label("since", getDateFormat().format(c.getConnectedSince())));
+ item.add(new Label("scope", c.getRoom() == null ? "html5" : "" + c.getRoom().getId()));
+ item.add(new Label("server", c.getServerId()));
+ item.add(new BootstrapAjaxLink<String>("kick", null, Buttons.Type.Outline_Danger, new ResourceModel("603")) {
+ private static final long serialVersionUID = 1L;
+ {
+ setSize(Buttons.Size.Small);
+ }
+
+ @Override
+ public void onClick(AjaxRequestTarget target) {
+ cm.invalidate(c.getUserId(), c.getSessionId());
+ target.add(container, details.setVisible(false));
+ }
+ }.add(newOkCancelConfirm(this, getString("605"))));
+ }
+
item.add(new AjaxEventBehavior(EVT_CLICK) {
private static final long serialVersionUID = 1L;
@@ -113,28 +167,33 @@ public class ConnectionsPanel extends AdminBasePanel {
protected void onEvent(AjaxRequestTarget target) {
Field[] ff = item.getModelObject().getClass().getDeclaredFields();
RepeatingView lines = new RepeatingView("line");
- Client c = item.getModelObject();
- for (Field f : ff) {
- int mod = f.getModifiers();
- if (Modifier.isStatic(mod) || Modifier.isTransient(mod)) {
- continue;
- }
- WebMarkupContainer line = new WebMarkupContainer(lines.newChildId());
- line.add(new Label("name", f.getName()));
- String val = "";
- try {
- f.setAccessible(true);
- val = "" + f.get(c);
- } catch (Exception e) {
- //noop
+ ConnectionListItem connection = item.getModelObject();
+
+ if (connection.getClient() != null) {
+ Client c = connection.getClient();
+ for (Field f : ff) {
+ int mod = f.getModifiers();
+ if (Modifier.isStatic(mod) || Modifier.isTransient(mod)) {
+ continue;
+ }
+ WebMarkupContainer line = new WebMarkupContainer(lines.newChildId());
+ line.add(new Label("name", f.getName()));
+ String val = "";
+ try {
+ f.setAccessible(true);
+ val = "" + f.get(c);
+ } catch (Exception e) {
+ //noop
+ }
+ line.add(new Label("value", val));
+ lines.add(line);
}
- line.add(new Label("value", val));
- lines.add(line);
+ details.addOrReplace(lines);
+ target.add(details.setVisible(true));
}
- details.addOrReplace(lines);
- target.add(details.setVisible(true));
}
});
+
item.add(AttributeModifier.append(ATTR_CLASS, ROW_CLASS));
}
};
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/dto/ConnectionListItem.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/dto/ConnectionListItem.java
new file mode 100644
index 0000000..ff352d7
--- /dev/null
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/dto/ConnectionListItem.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.web.admin.connection.dto;
+
+import java.io.Serializable;
+
+import org.apache.openmeetings.db.entity.IDataProviderEntity;
+import org.apache.openmeetings.db.entity.basic.Client;
+
+/**
+ * Wrapper object for the UI display as the list item might be a client (that
+ * may have also has KStreams referenced).<br>
+ * Or<br>
+ * A single KStream.<br>
+ *
+ * We want to see a total list of connections, a client represents a WebSocket and session.
+ * A KStream is a MediaStream.
+ *
+ * @author sebawagner
+ *
+ */
+public class ConnectionListItem implements IDataProviderEntity, Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private Client client;
+ private ConnectionListKStreamItem stream;
+
+ public ConnectionListItem(Client client, ConnectionListKStreamItem stream) {
+ super();
+ this.client = client;
+ this.stream = stream;
+ }
+ public Client getClient() {
+ return client;
+ }
+ public ConnectionListKStreamItem getStream() {
+ return stream;
+ }
+ @Override
+ public Long getId() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ @Override
+ public void setId(Long id) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/dto/ConnectionListKStreamItem.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/dto/ConnectionListKStreamItem.java
new file mode 100644
index 0000000..06d60d6
--- /dev/null
+++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/dto/ConnectionListKStreamItem.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.web.admin.connection.dto;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import org.apache.openmeetings.db.entity.basic.Client.StreamType;
+import org.apache.openmeetings.db.entity.record.RecordingChunk.Type;
+
+/**
+ * A KStream for the Wicket UI to display. This object is can be serialized, otherwise
+ * Wicket won't render it.
+ *
+ * So It contains NO reference to kurento client objects.
+ *
+ * @author sebawagner
+ *
+ */
+public class ConnectionListKStreamItem implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private String sid;
+ private String uid;
+ private Long roomId;
+ private Date connectedSince;
+ private StreamType streamType;
+ private String profile;
+ private String recorder;
+ private Long chunkId;
+ private Type type;
+
+ public ConnectionListKStreamItem(String sid, String uid, Long roomId, Date connectedSince, StreamType streamType,
+ String profile, String recorder, Long chunkId, Type type) {
+ super();
+ this.sid = sid;
+ this.uid = uid;
+ this.roomId = roomId;
+ this.connectedSince = connectedSince;
+ this.streamType = streamType;
+ this.profile = profile;
+ this.recorder = recorder;
+ this.chunkId = chunkId;
+ this.type = type;
+ }
+
+ public String getSid() {
+ return sid;
+ }
+ public String getUid() {
+ return uid;
+ }
+ public Long getRoomId() {
+ return roomId;
+ }
+ public Date getConnectedSince() {
+ return connectedSince;
+ }
+ public StreamType getStreamType() {
+ return streamType;
+ }
+ public String getProfile() {
+ return profile;
+ }
+ public String getRecorder() {
+ return recorder;
+ }
+ public Long getChunkId() {
+ return chunkId;
+ }
+ public Type getType() {
+ return type;
+ }
+
+
+
+}