You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@openmeetings.apache.org by co...@google.com on 2013/03/18 07:45:29 UTC
[red5phone] r108 committed - Video: video stability should be
enhanced with this patch.
Revision: 108
Author: solomax666@gmail.com
Date: Sun Mar 17 23:45:06 2013
Log: Video: video stability should be enhanced with this patch.
http://code.google.com/p/red5phone/source/detail?r=108
Modified:
/branches/red5sip/src/java/org/red5/sip/app/IMediaStream.java
/branches/red5sip/src/java/org/red5/sip/app/PlayNetStream.java
/branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java
/branches/red5sip/src/java/org/red5/sip/app/RTPStream.java
/branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
/branches/red5sip/src/java/org/red5/sip/app/RTPVideoStream.java
/branches/red5sip/src/java/org/red5/sip/app/SIPVideoConverter.java
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/IMediaStream.java Thu Feb
21 09:21:48 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/IMediaStream.java Sun Mar
17 23:45:06 2013
@@ -4,4 +4,6 @@
void send(long timestamp, byte[] asaoBuffer, int offset, int num);
+ void stop();
+
}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/PlayNetStream.java Tue Mar
5 21:44:18 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/PlayNetStream.java Sun Mar
17 23:45:06 2013
@@ -24,7 +24,7 @@
private IMediaSender videoSender;
- private IMediaStream videoStream;
+ private RTPVideoStream videoStream;
private RTMPRoomClient client;
@@ -52,7 +52,7 @@
audioStream = audioSender.createStream(getStreamId());
}
if (videoSender != null) {
- videoStream = videoSender.createStream(getStreamId());
+ videoStream = (RTPVideoStream) videoSender.createStream(getStreamId());
}
}
@@ -63,6 +63,12 @@
if (videoSender != null) {
videoSender.deleteStream(getStreamId());
}
+ if (audioStream != null) {
+ audioStream.stop();
+ }
+ if (videoStream != null) {
+ videoStream.stop();
+ }
}
public void dispatchEvent(IEvent event) {
@@ -86,7 +92,12 @@
if (rtmpEvent instanceof VideoData) {
int newStreamId = client.getActiveVideoStreamID();
+ if (newStreamId == -1) {
+ newStreamId = rtmpEvent.getHeader().getStreamId();
+ client.setActiveVideoStreamID(newStreamId);
+ }
if (rtmpEvent.getHeader().getStreamId() != newStreamId) {
+ logger.debug("ignoring stream id=" +
rtmpEvent.getHeader().getStreamId() + " current stream is " + newStreamId);
return;
}
@@ -94,6 +105,9 @@
logger.debug("switching video to a new stream: " + newStreamId);
currentStreamID = newStreamId;
keyframeReceived = false;
+ if (videoStream != null) {
+ videoStream.getConverter().resetConverter();
+ }
}
if (((VideoData) rtmpEvent).getFrameType() == FrameType.KEYFRAME) {
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java Wed Mar
13 05:31:08 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java Sun Mar
17 23:45:06 2013
@@ -164,6 +164,10 @@
public int getActiveVideoStreamID() {
return activeVideoStreamID;
}
+
+ public void setActiveVideoStreamID(int activeVideoStreamID) {
+ this.activeVideoStreamID = activeVideoStreamID;
+ }
private void createPlayStream(long broadCastId) {
@@ -193,9 +197,6 @@
conn.addClientStream(stream);
play(streamIdInt, "" + broadCastId, -2000, -1000);
stream.start();
- if (activeVideoStreamID == -1) {
- activeVideoStreamID = streamIdInt;
- }
}
}
}
@@ -358,12 +359,8 @@
conn.getStreamById(streamId).stop();
conn.removeClientStream(streamId);
conn.deleteStreamById(streamId);
- if (streamId == activeVideoStreamID) {
- if (clientStreamMap.size() == 0) {
- activeVideoStreamID = -1;
- } else {
- activeVideoStreamID = clientStreamMap.values().iterator().next();
- }
+ if (streamId == getActiveVideoStreamID()) {
+ setActiveVideoStreamID(-1);
}
}
}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStream.java Thu Feb 21
09:21:48 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPStream.java Sun Mar 17
23:45:06 2013
@@ -147,4 +147,9 @@
return finalCopySize;
}
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
Thu Feb 21 09:21:48 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
Sun Mar 17 23:45:06 2013
@@ -60,4 +60,9 @@
protected synchronized int read(byte[] dst, int offset) {
return buffer.take(dst, offset);
}
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPVideoStream.java Wed Mar
13 05:31:08 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPVideoStream.java Sun Mar
17 23:45:06 2013
@@ -1,26 +1,94 @@
package org.red5.sip.app;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.red5.codecs.SIPCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import local.net.RtpPacket;
public class RTPVideoStream implements IMediaStream {
+ private static Logger log = LoggerFactory.getLogger(RTPVideoStream.class);
private RTPStreamVideoSender sender;
private SIPVideoConverter converter;
private SIPCodec codec;
+ private boolean running;
+ private ConverterThread converterThread;
public RTPVideoStream(SIPTransport sipTransport, RTPStreamVideoSender
sender, SIPCodec codec) {
this.sender = sender;
this.codec = codec;
converter = new SIPVideoConverter(sipTransport);
+ converterThread = new ConverterThread();
+ converterThread.start();
+ running = true;
}
@Override
public void send(long timestamp, byte[] data, int offset, int num) {
- for (RtpPacket packet: converter.rtmp2rtp(data, timestamp, codec)) {
- sender.send(packet);
+ if (!running) {
+ throw new IllegalStateException("Steam is not started");
}
+ converterThread.addData(data, timestamp);
}
+ @Override
+ public void stop() {
+ running = false;
+ }
+
+ public SIPVideoConverter getConverter() {
+ return converter;
+ }
+
+ private class ConverterThread extends Thread {
+
+ private Queue<QueueItem> queue;
+
+ public ConverterThread() {
+ queue = new ConcurrentLinkedQueue<QueueItem>();
+ }
+
+ public void addData(byte[] data, long ts) {
+ queue.add(new QueueItem(ts, data));
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ QueueItem item = queue.poll();
+ if (item != null) {
+ for (RtpPacket packet: converter.rtmp2rtp(item.data, item.ts,
codec)) {
+ sender.send(packet);
+ }
+ }
+
+ if (queue.size() == 0) {
+ Thread.sleep(50);
+ }
+ } catch (Exception e) {
+ log.error("", e);
+ }
+ }
+ }
+
+ private class QueueItem {
+
+ public final long ts;
+ public final byte[] data;
+
+ public QueueItem(long ts, byte[] data) {
+ super();
+ this.ts = ts;
+ this.data = data;
+ }
+
+ }
+
+ }
+
}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/SIPVideoConverter.java Wed
Mar 13 05:31:08 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/SIPVideoConverter.java Sun
Mar 17 23:45:06 2013
@@ -13,22 +13,24 @@
public class SIPVideoConverter {
private static final Logger log =
LoggerFactory.getLogger(SIPVideoConverter.class);
+ private static final int MAX_RTP_PAYLOAD_SIZE = 1446;
// rtp => rtmp
- private byte[] sps1;
+ private byte[] sps;
private byte[] pps;
private boolean sentSeq;
private long lastFIRTime;
private long startTs;
private long startTm;
private long startRelativeTime;
+ private int lastReceivedSequenceNumber;
private List<RtpPacketWrapper> packetsQueue;
private SIPTransport sipTransport;
// rtmp => rtp
private int lenSize;
- private boolean spsSent = false;
- private boolean ppsSent = false;
+ private boolean spsSent;
+ private boolean ppsSent;
public SIPVideoConverter(SIPTransport sipTransport) {
this.sipTransport = sipTransport;
@@ -39,11 +41,14 @@
public void resetConverter() {
packetsQueue = new ArrayList<RtpPacketWrapper>();
lastFIRTime = System.currentTimeMillis();
- sps1 = new byte[0];
+ sps = new byte[0];
pps = new byte[0];
sentSeq = false;
startTs = -1;
startTm = -1;
+ lastReceivedSequenceNumber = -1;
+ spsSent = false;
+ ppsSent = false;
}
public List<RTMPPacketInfo> rtp2rtmp(RtpPacket packet, SIPCodec codec) {
@@ -68,6 +73,7 @@
private List<RtpPacket> rtmp2rtpH254(byte data[], long ts) {
List<RtpPacket> result = new ArrayList<RtpPacket>();
+ long ts1 = ts * 90;
if (data[0] == 0x17 && data[1] == 0) {
byte[] pdata = Arrays.copyOfRange(data, 2, data.length);
int cfgVer = pdata[3];
@@ -98,7 +104,6 @@
this.lenSize = lenSize;
if (sps != null) {
spsSent = true;
- long ts1 = ts * 90;
byte[] buffer = new byte[sps.length + 12];
RtpPacket packet = new RtpPacket(buffer, 0);
packet.setPayload(sps, sps.length);
@@ -108,7 +113,6 @@
}
if (pps != null) {
ppsSent = true;
- long ts1 = ts * 90;
byte[] buffer = new byte[pps.length + 12];
RtpPacket packet = new RtpPacket(buffer, 0);
packet.setPayload(pps, pps.length);
@@ -116,6 +120,8 @@
buffer[1] = (byte) 0xe3;
result.add(packet);
}
+ } else {
+ log.debug("Unsuported cfgVer=" + cfgVer);
}
} else if ((data[0] == 0x17 || data[0] == 0x27) && data[1] == 1) {
if (spsSent && ppsSent) {
@@ -147,10 +153,8 @@
byte[] remaining = nals.get(nals.size() - 1).buildArray();
int nalType = remaining[0] & 0x1f;
int nri = remaining[0] & 0x60;
- int maxSize = 1446;
if (nalType == 5 || nalType == 1) {
- long ts1 = ts * 90;
- if (remaining.length < maxSize) {
+ if (remaining.length < MAX_RTP_PAYLOAD_SIZE) {
byte[] buffer = new byte[remaining.length + 12];
RtpPacket packet = new RtpPacket(buffer, 0);
packet.setPayload(remaining, remaining.length);
@@ -161,8 +165,8 @@
byte start = (byte) 0x80;
remaining = Arrays.copyOfRange(remaining, 1, remaining.length);
while (remaining.length > 0) {
- pdata = Arrays.copyOf(remaining, Math.min(maxSize - 2,
remaining.length));
- remaining = Arrays.copyOfRange(remaining, Math.min(maxSize - 2,
remaining.length), remaining.length);
+ pdata = Arrays.copyOf(remaining, Math.min(MAX_RTP_PAYLOAD_SIZE -
2, remaining.length));
+ remaining = Arrays.copyOfRange(remaining,
Math.min(MAX_RTP_PAYLOAD_SIZE - 2, remaining.length), remaining.length);
byte end = (byte) ((remaining.length > 0)? 0: 0x40);
ByteArrayBuilder payload = new ByteArrayBuilder((byte) (nri | 28),
(byte) (start | end | nalType));
payload.putArray(pdata);
@@ -179,20 +183,30 @@
}
}
}
+ } else {
+ log.debug("Missing rtmp data");
}
return result;
}
private List<RTMPPacketInfo> rtp2rtmpH264(RtpPacket packet) {
+ if (packet.getPayloadType() != 35) {
+ return new ArrayList<RTMPPacketInfo>();
+ }
+ if (lastReceivedSequenceNumber != -1 && (packet.getSequenceNumber() -
lastReceivedSequenceNumber != 1)) {
+ log.debug("Missing new packet sequence number " +
packet.getSequenceNumber());
+ resetConverter();
+ return new ArrayList<RTMPPacketInfo>();
+ }
+ lastReceivedSequenceNumber = packet.getSequenceNumber();
+
List<RTMPPacketInfo> result = new ArrayList<RTMPPacketInfo>();
byte[] payload = packet.getPayload();
int nalType = payload[0] & 0x1f;
- byte[] naldata = null;
-
switch (nalType) {
case 7: // SPS
- sps1 = payload;
- log.debug("SPS received: " + Arrays.toString(sps1));
+ sps = payload;
+ log.debug("SPS received: " + Arrays.toString(sps));
break;
case 8: // PPS
pps = payload;
@@ -200,18 +214,18 @@
break;
default:
if (payload.length > 1) {
- if (nalType == 24) { // for cisco phones
+ if (nalType == 24) { // for aggregated SPS and PPS
payload = Arrays.copyOfRange(payload, 1, payload.length);
while (payload.length > 0) {
- int size = payload[1];
+ int size = payload[0] & 0xff;
payload = Arrays.copyOfRange(payload, 2, payload.length);
- naldata = Arrays.copyOf(payload, size);
+ byte[] naldata = Arrays.copyOf(payload, size);
payload = Arrays.copyOfRange(payload, size, payload.length);
int nt = naldata[0] & 0x1f;
switch (nt) {
case 7:
- sps1 = naldata;
- log.debug("SPS received: " + Arrays.toString(sps1));
+ sps = naldata;
+ log.debug("SPS received: " + Arrays.toString(sps));
break;
case 8:
pps = naldata;
@@ -267,9 +281,9 @@
case 24:
payload = Arrays.copyOfRange(payload, 1, payload.length);
while (payload.length > 0) {
- int size = payload[0];
+ int size = payload[1] & 0xff;
payload = Arrays.copyOfRange(payload, 2, payload.length);
- naldata = Arrays.copyOf(payload, size);
+ byte[] naldata = Arrays.copyOf(payload, size);
payload = Arrays.copyOfRange(payload, size, payload.length);
int nt = naldata[0] & 0x1f;
if (nt == 5 || nt == 1) {
@@ -317,14 +331,13 @@
payloads.add(newdata);
}
- if (!sentSeq && nalType != 5 && pps.length > 0 && sps1.length > 0 ||
sps1.length == 0 || pps.length == 0) {
- packetsQueue.clear();
+ if (!sentSeq && nalType != 5 && pps.length > 0 && sps.length > 0 ||
sps.length == 0 || pps.length == 0) {
if (System.currentTimeMillis() - lastFIRTime > 5000) {
lastFIRTime = System.currentTimeMillis();
requestFIR();
}
} else {
- if (pps.length > 0 && sps1.length > 0 && !sentSeq && nalType == 5) {
+ if (pps.length > 0 && sps.length > 0 && !sentSeq && nalType == 5) {
sentSeq = true;
}
@@ -336,15 +349,15 @@
startTm = System.currentTimeMillis() - startRelativeTime;
}
- long tm = startTm + (packet.getTimestamp() - startTs) / 90; // 90 =
bitrate / 1000
+ long tm = startTm + (packet.getTimestamp() - startTs) / 132;
if (nalType == 5 && payloads.size() > 0) {
ByteArrayBuilder data = new ByteArrayBuilder();
// first byte: 0x17 for intra-frame
// second byte: 0x00 for configuration data
data.putArray(new byte[]{0x17, 0, 0, 0, 0, 1});
- data.putArray(Arrays.copyOfRange(sps1, 1, 4));
- data.putArray((byte) 0xff, (byte) 0xe1, (byte) (sps1.length >>> 8),
(byte) sps1.length);
- data.putArray(sps1);
+ data.putArray(Arrays.copyOfRange(sps, 1, 4));
+ data.putArray((byte) 0xff, (byte) 0xe1, (byte) (sps.length >>> 8),
(byte) sps.length);
+ data.putArray(sps);
data.putArray((byte) 1, (byte) (pps.length >>> 8), (byte) pps.length);
data.putArray(pps);
payloads.add(0, data);
@@ -360,7 +373,6 @@
}
protected void requestFIR() {
- log.debug("requesting FIR...");
sipTransport.requestFIR();
}