You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@openmeetings.apache.org by co...@google.com on 2013/03/18 07:45:29 UTC

[red5phone] r108 committed - Video: video stability should be enhanced with this patch.

Revision: 108
Author:   solomax666@gmail.com
Date:     Sun Mar 17 23:45:06 2013
Log:      Video: video stability should be enhanced with this patch.
http://code.google.com/p/red5phone/source/detail?r=108

Modified:
  /branches/red5sip/src/java/org/red5/sip/app/IMediaStream.java
  /branches/red5sip/src/java/org/red5/sip/app/PlayNetStream.java
  /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java
  /branches/red5sip/src/java/org/red5/sip/app/RTPStream.java
  /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
  /branches/red5sip/src/java/org/red5/sip/app/RTPVideoStream.java
  /branches/red5sip/src/java/org/red5/sip/app/SIPVideoConverter.java

=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/IMediaStream.java	Thu Feb  
21 09:21:48 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/IMediaStream.java	Sun Mar  
17 23:45:06 2013
@@ -4,4 +4,6 @@

  	void send(long timestamp, byte[] asaoBuffer, int offset, int num);

+	void stop();
+
  }
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/PlayNetStream.java	Tue Mar   
5 21:44:18 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/PlayNetStream.java	Sun Mar  
17 23:45:06 2013
@@ -24,7 +24,7 @@

  	private IMediaSender videoSender;

-	private IMediaStream videoStream;
+	private RTPVideoStream videoStream;

  	private RTMPRoomClient client;

@@ -52,7 +52,7 @@
  			audioStream = audioSender.createStream(getStreamId());
  		}
  		if (videoSender != null) {
-			videoStream = videoSender.createStream(getStreamId());
+			videoStream = (RTPVideoStream) videoSender.createStream(getStreamId());
  		}
  	}

@@ -63,6 +63,12 @@
  		if (videoSender != null) {
  			videoSender.deleteStream(getStreamId());
  		}
+		if (audioStream != null) {
+			audioStream.stop();
+		}
+		if (videoStream != null) {
+			videoStream.stop();
+		}
  	}

  	public void dispatchEvent(IEvent event) {
@@ -86,7 +92,12 @@

  		if (rtmpEvent instanceof VideoData) {
  			int newStreamId = client.getActiveVideoStreamID();
+			if (newStreamId == -1) {
+				newStreamId = rtmpEvent.getHeader().getStreamId();
+				client.setActiveVideoStreamID(newStreamId);
+			}
  			if (rtmpEvent.getHeader().getStreamId() != newStreamId) {
+				logger.debug("ignoring stream id=" +  
rtmpEvent.getHeader().getStreamId() + " current stream is " + newStreamId);
  				return;
  			}

@@ -94,6 +105,9 @@
  				logger.debug("switching video to a new stream: " + newStreamId);
  				currentStreamID = newStreamId;
  				keyframeReceived = false;
+				if (videoStream != null) {
+					videoStream.getConverter().resetConverter();
+				}
  			}

  			if (((VideoData) rtmpEvent).getFrameType() == FrameType.KEYFRAME) {
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java	Wed Mar  
13 05:31:08 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java	Sun Mar  
17 23:45:06 2013
@@ -164,6 +164,10 @@
  	public int getActiveVideoStreamID() {
  		return activeVideoStreamID;
  	}
+
+	public void setActiveVideoStreamID(int activeVideoStreamID) {
+		this.activeVideoStreamID = activeVideoStreamID;
+	}

  	private void createPlayStream(long broadCastId) {

@@ -193,9 +197,6 @@
  				conn.addClientStream(stream);
  				play(streamIdInt, "" + broadCastId, -2000, -1000);
  				stream.start();
-				if (activeVideoStreamID == -1) {
-					activeVideoStreamID = streamIdInt;
-				}
  			}
  		}
  	}
@@ -358,12 +359,8 @@
  			conn.getStreamById(streamId).stop();
  			conn.removeClientStream(streamId);
  			conn.deleteStreamById(streamId);
-			if (streamId == activeVideoStreamID) {
-				if (clientStreamMap.size() == 0) {
-					activeVideoStreamID = -1;
-				} else {
-					activeVideoStreamID = clientStreamMap.values().iterator().next();
-				}
+			if (streamId == getActiveVideoStreamID()) {
+				setActiveVideoStreamID(-1);
  			}
  		}
  	}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStream.java	Thu Feb 21  
09:21:48 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPStream.java	Sun Mar 17  
23:45:06 2013
@@ -147,4 +147,9 @@

  		return finalCopySize;
  	}
+
+	@Override
+	public void stop() {
+		// nothing to do
+	}
  }
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java	 
Thu Feb 21 09:21:48 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java	 
Sun Mar 17 23:45:06 2013
@@ -60,4 +60,9 @@
  	protected synchronized int read(byte[] dst, int offset) {
  		return buffer.take(dst, offset);
  	}
+
+	@Override
+	public void stop() {
+		// nothing to do
+	}
  }
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPVideoStream.java	Wed Mar  
13 05:31:08 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPVideoStream.java	Sun Mar  
17 23:45:06 2013
@@ -1,26 +1,94 @@
  package org.red5.sip.app;

+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
  import org.red5.codecs.SIPCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

  import local.net.RtpPacket;

  public class RTPVideoStream implements IMediaStream {

+	private static Logger log = LoggerFactory.getLogger(RTPVideoStream.class);
  	private RTPStreamVideoSender sender;
  	private SIPVideoConverter converter;
  	private SIPCodec codec;
+	private boolean running;
+	private ConverterThread converterThread;

  	public RTPVideoStream(SIPTransport sipTransport, RTPStreamVideoSender  
sender, SIPCodec codec) {
  		this.sender = sender;
  		this.codec = codec;
  		converter = new SIPVideoConverter(sipTransport);
+		converterThread = new ConverterThread();
+		converterThread.start();
+		running = true;
  	}

  	@Override
  	public void send(long timestamp, byte[] data, int offset, int num) {
-		for (RtpPacket packet: converter.rtmp2rtp(data, timestamp, codec)) {
-			sender.send(packet);
+		if (!running) {
+			throw new IllegalStateException("Steam is not started");
  		}
+		converterThread.addData(data, timestamp);
  	}

+	@Override
+	public void stop() {
+		running = false;
+	}
+
+	public SIPVideoConverter getConverter() {
+		return converter;
+	}
+
+	private class ConverterThread extends Thread {
+
+		private Queue<QueueItem> queue;
+
+		public ConverterThread() {
+			queue = new ConcurrentLinkedQueue<QueueItem>();
+		}
+
+		public void addData(byte[] data, long ts) {
+			queue.add(new QueueItem(ts, data));
+		}
+
+		@Override
+		public void run() {
+			while (running) {
+				try {
+					QueueItem item = queue.poll();
+					if (item != null) {
+						for (RtpPacket packet: converter.rtmp2rtp(item.data, item.ts,  
codec)) {
+							sender.send(packet);
+						}
+					}
+
+					if (queue.size() == 0) {
+						Thread.sleep(50);
+					}
+				} catch (Exception e) {
+					log.error("", e);
+				}
+			}
+		}
+
+		private class QueueItem {
+
+			public final long ts;
+			public final byte[] data;
+
+			public QueueItem(long ts, byte[] data) {
+				super();
+				this.ts = ts;
+				this.data = data;
+			}
+
+		}
+
+	}
+
  }
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/SIPVideoConverter.java	Wed  
Mar 13 05:31:08 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/SIPVideoConverter.java	Sun  
Mar 17 23:45:06 2013
@@ -13,22 +13,24 @@
  public class SIPVideoConverter {

  	private static final Logger log =  
LoggerFactory.getLogger(SIPVideoConverter.class);
+	private static final int MAX_RTP_PAYLOAD_SIZE = 1446;

  	// rtp => rtmp
-	private byte[] sps1;
+	private byte[] sps;
  	private byte[] pps;
  	private boolean sentSeq;
  	private long lastFIRTime;
  	private long startTs;
  	private long startTm;
  	private long startRelativeTime;
+	private int lastReceivedSequenceNumber;
  	private List<RtpPacketWrapper> packetsQueue;
  	private SIPTransport sipTransport;

  	// rtmp => rtp
  	private int lenSize;
-	private boolean spsSent = false;
-	private boolean ppsSent = false;
+	private boolean spsSent;
+	private boolean ppsSent;

  	public SIPVideoConverter(SIPTransport sipTransport) {
  		this.sipTransport = sipTransport;
@@ -39,11 +41,14 @@
  	public void resetConverter() {
  		packetsQueue = new ArrayList<RtpPacketWrapper>();
  		lastFIRTime = System.currentTimeMillis();
-		sps1 = new byte[0];
+		sps = new byte[0];
  		pps = new byte[0];
  		sentSeq = false;
  		startTs = -1;
  		startTm = -1;
+		lastReceivedSequenceNumber = -1;
+		spsSent = false;
+		ppsSent = false;
  	}

  	public List<RTMPPacketInfo> rtp2rtmp(RtpPacket packet, SIPCodec codec) {
@@ -68,6 +73,7 @@

  	private List<RtpPacket> rtmp2rtpH254(byte data[], long ts) {
  		List<RtpPacket> result = new ArrayList<RtpPacket>();
+		long ts1 = ts * 90;
  		if (data[0] == 0x17 && data[1] == 0) {
  			byte[] pdata = Arrays.copyOfRange(data, 2, data.length);
  			int cfgVer = pdata[3];
@@ -98,7 +104,6 @@
  				this.lenSize = lenSize;
  				if (sps != null) {
  					spsSent = true;
-					long ts1 = ts * 90;
  					byte[] buffer = new byte[sps.length + 12];
  					RtpPacket packet = new RtpPacket(buffer, 0);
  					packet.setPayload(sps, sps.length);
@@ -108,7 +113,6 @@
  				}
  				if (pps != null) {
  					ppsSent = true;
-					long ts1 = ts * 90;
  					byte[] buffer = new byte[pps.length + 12];
  					RtpPacket packet = new RtpPacket(buffer, 0);
  					packet.setPayload(pps, pps.length);
@@ -116,6 +120,8 @@
  					buffer[1] = (byte) 0xe3;
  					result.add(packet);
  				}
+			} else {
+				log.debug("Unsuported cfgVer=" + cfgVer);
  			}
  		} else if ((data[0] == 0x17 || data[0] == 0x27) && data[1] == 1) {
  			if (spsSent && ppsSent) {
@@ -147,10 +153,8 @@
  					byte[] remaining = nals.get(nals.size() - 1).buildArray();
  					int nalType = remaining[0] & 0x1f;
  					int nri = remaining[0] & 0x60;
-					int maxSize = 1446;
  					if (nalType == 5 || nalType == 1) {
-						long ts1 = ts * 90;
-						if (remaining.length < maxSize) {
+						if (remaining.length < MAX_RTP_PAYLOAD_SIZE) {
  							byte[] buffer = new byte[remaining.length + 12];
  							RtpPacket packet = new RtpPacket(buffer, 0);
  							packet.setPayload(remaining, remaining.length);
@@ -161,8 +165,8 @@
  							byte start = (byte) 0x80;
  							remaining = Arrays.copyOfRange(remaining, 1, remaining.length);
  							while (remaining.length > 0) {
-								pdata = Arrays.copyOf(remaining, Math.min(maxSize - 2,  
remaining.length));
-								remaining = Arrays.copyOfRange(remaining, Math.min(maxSize - 2,  
remaining.length), remaining.length);
+								pdata = Arrays.copyOf(remaining, Math.min(MAX_RTP_PAYLOAD_SIZE -  
2, remaining.length));
+								remaining = Arrays.copyOfRange(remaining,  
Math.min(MAX_RTP_PAYLOAD_SIZE - 2, remaining.length), remaining.length);
  								byte end = (byte) ((remaining.length > 0)? 0: 0x40);
  								ByteArrayBuilder payload = new ByteArrayBuilder((byte) (nri | 28),  
(byte) (start | end | nalType));
  								payload.putArray(pdata);
@@ -179,20 +183,30 @@
  					}
  				}
  			}
+		} else {
+			log.debug("Missing rtmp data");
  		}
  		return result;
  	}

  	private List<RTMPPacketInfo> rtp2rtmpH264(RtpPacket packet) {
+		if (packet.getPayloadType() != 35) {
+			return new ArrayList<RTMPPacketInfo>();
+		}
+		if (lastReceivedSequenceNumber != -1 && (packet.getSequenceNumber() -  
lastReceivedSequenceNumber != 1)) {
+			log.debug("Missing new packet sequence number " +  
packet.getSequenceNumber());
+			resetConverter();
+			return new ArrayList<RTMPPacketInfo>();
+		}
+		lastReceivedSequenceNumber = packet.getSequenceNumber();
+
  		List<RTMPPacketInfo> result = new ArrayList<RTMPPacketInfo>();
  		byte[] payload = packet.getPayload();
  		int nalType = payload[0] & 0x1f;
-		byte[] naldata = null;
-
  		switch (nalType) {
  		case 7: // SPS
-			sps1 = payload;
-			log.debug("SPS received: " + Arrays.toString(sps1));
+			sps = payload;
+			log.debug("SPS received: " + Arrays.toString(sps));
  			break;
  		case 8: // PPS
  			pps = payload;
@@ -200,18 +214,18 @@
  			break;
  		default:
  			if (payload.length > 1) {
-				if (nalType == 24) { // for cisco phones
+				if (nalType == 24) { // for aggregated SPS and PPS
  					payload = Arrays.copyOfRange(payload, 1, payload.length);
  					while (payload.length > 0) {
-						int size = payload[1];
+						int size = payload[0] & 0xff;
  						payload = Arrays.copyOfRange(payload, 2, payload.length);
-						naldata = Arrays.copyOf(payload, size);
+						byte[] naldata = Arrays.copyOf(payload, size);
  						payload = Arrays.copyOfRange(payload, size, payload.length);
  						int nt = naldata[0] & 0x1f;
  						switch (nt) {
  						case 7:
-							sps1 = naldata;
-							log.debug("SPS received: " + Arrays.toString(sps1));
+							sps = naldata;
+							log.debug("SPS received: " + Arrays.toString(sps));
  							break;
  						case 8:
  							pps = naldata;
@@ -267,9 +281,9 @@
  				case 24:
  					payload = Arrays.copyOfRange(payload, 1, payload.length);
  					while (payload.length > 0) {
-						int size = payload[0];
+						int size = payload[1] & 0xff;
  						payload = Arrays.copyOfRange(payload, 2, payload.length);
-						naldata = Arrays.copyOf(payload, size);
+						byte[] naldata = Arrays.copyOf(payload, size);
  						payload = Arrays.copyOfRange(payload, size, payload.length);
  						int nt = naldata[0] & 0x1f;
  						if (nt == 5 || nt == 1) {
@@ -317,14 +331,13 @@
  				payloads.add(newdata);
  			}

-			if (!sentSeq && nalType != 5 && pps.length > 0 && sps1.length > 0 ||  
sps1.length == 0 || pps.length == 0) {
-				packetsQueue.clear();
+			if (!sentSeq && nalType != 5 && pps.length > 0 && sps.length > 0 ||  
sps.length == 0 || pps.length == 0) {
  				if (System.currentTimeMillis() - lastFIRTime > 5000) {
  					lastFIRTime = System.currentTimeMillis();
  					requestFIR();
  				}
  			} else {
-				if (pps.length > 0 && sps1.length > 0 && !sentSeq && nalType == 5) {
+				if (pps.length > 0 && sps.length > 0 && !sentSeq && nalType == 5) {
  					sentSeq = true;
  				}

@@ -336,15 +349,15 @@
  					startTm = System.currentTimeMillis() - startRelativeTime;
  				}

-				long tm = startTm + (packet.getTimestamp() - startTs) / 90; // 90 =  
bitrate / 1000
+				long tm = startTm + (packet.getTimestamp() - startTs) / 132;
  				if (nalType == 5 && payloads.size() > 0) {
  					ByteArrayBuilder data = new ByteArrayBuilder();
  					// first byte: 0x17 for intra-frame
  					// second byte: 0x00 for configuration data
  					data.putArray(new byte[]{0x17, 0, 0, 0, 0, 1});
-					data.putArray(Arrays.copyOfRange(sps1, 1, 4));
-					data.putArray((byte) 0xff, (byte) 0xe1, (byte) (sps1.length >>> 8),  
(byte) sps1.length);
-					data.putArray(sps1);
+					data.putArray(Arrays.copyOfRange(sps, 1, 4));
+					data.putArray((byte) 0xff, (byte) 0xe1, (byte) (sps.length >>> 8),  
(byte) sps.length);
+					data.putArray(sps);
  					data.putArray((byte) 1, (byte) (pps.length >>> 8), (byte) pps.length);
  					data.putArray(pps);
  					payloads.add(0, data);
@@ -360,7 +373,6 @@
  	}

  	protected void requestFIR() {
-		log.debug("requesting FIR...");
  		sipTransport.requestFIR();
  	}