You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@openmeetings.apache.org by co...@google.com on 2013/02/10 21:42:58 UTC
[red5phone] r74 committed - Support custom ASAO sampling rate;...
Revision: 74
Author: timur.tleukenov@gmail.com
Date: Sun Feb 10 12:42:20 2013
Log: Support custom ASAO sampling rate;
Resampling pcm to SIP codec sampling rate;
Fixing bugs.
http://code.google.com/p/red5phone/source/detail?r=74
Added:
/branches/red5sip/lib/libresample4j-1.0.jar
/branches/red5sip/lib/sources
/branches/red5sip/lib/sources/red5-1.0-sources.jar
/branches/red5sip/src/java/org/red5/sip/app/BytesBuffer.java
Modified:
/branches/red5sip/build.xml
/branches/red5sip/settings.properties
/branches/red5sip/src/java/org/red5/sip/app/Application.java
/branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java
/branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
/branches/red5sip/src/java/org/red5/sip/app/RTPStreamMultiplexingSender.java
=======================================
--- /dev/null
+++ /branches/red5sip/lib/libresample4j-1.0.jar Sun Feb 10 12:42:20 2013
Binary file, no diff available.
=======================================
--- /dev/null
+++ /branches/red5sip/lib/sources/red5-1.0-sources.jar Sun Feb 10 12:42:20
2013
Binary file, no diff available.
=======================================
--- /dev/null
+++ /branches/red5sip/src/java/org/red5/sip/app/BytesBuffer.java Sun Feb 10
12:42:20 2013
@@ -0,0 +1,70 @@
+package org.red5.sip.app;
+
+public class BytesBuffer {
+ private final int buffersCount;
+ private final int arrayLength;
+ private byte[][] buffer;
+ private int[] bufLen;
+ private int start, end;
+ public BytesBuffer(int arrayLength, int buffersCount) {
+ this.buffersCount = buffersCount;
+ this.arrayLength = arrayLength;
+ this.buffer = new byte[buffersCount][arrayLength];
+ this.bufLen = new int[buffersCount];
+ }
+
+ protected void onBufferOverflow() {
+ clean();
+ }
+
+ protected void onBufferEmpty() {
+
+ }
+
+ public void clean() {
+ end = 0;
+ start = -1;
+ }
+
+ protected int available() {
+ return (end > start) ? (end - start) : (buffersCount - start +
end);
+ }
+
+ protected float bufferUsage() {
+ return available() * 1.0f / buffersCount;
+ }
+
+ public void push(byte[] array, int offset, int length) {
+ if(end == start) {
+ onBufferOverflow();
+ }
+ if(arrayLength < length) {
+ throw new IllegalArgumentException("Array length too much: " +
length);
+ }
+ System.arraycopy(array, offset, buffer[end], 0, length);
+ bufLen[end++] = length;
+ if(end == buffersCount) {
+ end = 0;
+ }
+ if(start == -1) {
+ start = 0;
+ }
+ }
+
+ public int take(byte[] dst, int offset) {
+ int res = -1;
+ if(start >= 0) {
+ System.arraycopy(buffer[start], 0, dst, offset,
Math.min(bufLen[start], dst.length - offset));
+ res = bufLen[start++];
+ if(start == buffersCount) {
+ start = 0;
+ }
+ if(start == end) {
+ start = -1;
+ end = 0;
+ onBufferEmpty();
+ }
+ }
+ return res;
+ }
+}
=======================================
--- /branches/red5sip/build.xml Fri Feb 10 07:37:46 2012
+++ /branches/red5sip/build.xml Sun Feb 10 12:42:20 2013
@@ -85,6 +85,10 @@
<pathelement location="${basedir}/lib/red5.jar"/>
<pathelement location="${basedir}/lib/xmlrpc-2.0.1.jar"/>
</path>
+
+ <path id="library.libresample4j.classpath">
+ <pathelement location="${basedir}/lib/libresample4j-1.0.jar"/>
+ </path>
<path id="library.red5sip.classpath">
<pathelement location="${basedir}/lib/red5sip.jar"/>
@@ -149,6 +153,7 @@
<path refid="library.mjsip.classpath"/>
<path refid="library.red5.classpath"/>
<path refid="library.log.classpath"/>
+ <path refid="library.libresample4j.classpath"/>
<path refid="library.apache-commons.classpath"/>
<path refid="library.openmeetings.classpath"/>
<path refid="library.springframwork.classpath"/>
@@ -160,6 +165,7 @@
<path refid="library.mjsip.classpath"/>
<path refid="library.red5.classpath"/>
<path refid="library.log.classpath"/>
+ <path refid="library.libresample4j.classpath"/>
<path refid="library.apache-commons.classpath"/>
<path refid="library.openmeetings.classpath"/>
<path refid="library.springframwork.classpath"/>
@@ -171,6 +177,7 @@
<path refid="library.mjsip.classpath"/>
<path refid="library.red5.classpath"/>
<path refid="library.log.classpath"/>
+ <path refid="library.libresample4j.classpath"/>
<path refid="library.apache-commons.classpath"/>
<path refid="library.openmeetings.classpath"/>
<path refid="library.springframwork.classpath"/>
@@ -183,6 +190,7 @@
<path refid="library.mjsip.classpath"/>
<path refid="library.red5.classpath"/>
<path refid="library.log.classpath"/>
+ <path refid="library.libresample4j.classpath"/>
<path refid="library.apache-commons.classpath"/>
<path refid="library.openmeetings.classpath"/>
<path refid="library.springframwork.classpath"/>
=======================================
--- /branches/red5sip/settings.properties Mon Feb 4 23:34:05 2013
+++ /branches/red5sip/settings.properties Sun Feb 10 12:42:20 2013
@@ -1,5 +1,6 @@
red5.host=127.0.0.1
red5.codec=asao
+red5.codec.rate=22
sip.obproxy=127.0.0.1
sip.phone=test
sip.authid=test
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/Application.java Tue Jan 29
03:52:07 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/Application.java Sun Feb 10
12:42:20 2013
@@ -63,6 +63,12 @@
return;
}
props = PropertiesUtils.load(settings);
+ try {
+ RTPStreamMultiplexingSender.sampling =
RTPStreamMultiplexingSender.SAMPLE_RATE.findByShortName(Integer.parseInt(props.getProperty("red5.codec.rate", "22")));
+ } catch (NumberFormatException e) {
+ log.error("Can't parse red5.codec.rate value", e);
+ }
+
}
public void init(DaemonContext daemonContext) throws Exception {
@@ -70,21 +76,34 @@
}
public void start() throws Exception {
- this.rtmpControlClient = new
RTMPControlClient(props.getProperty("red5.host"), "openmeetings") {
- @Override
- protected void startRoomClient(int id) {
- transportMap.put(id, createSIPTransport(props, id));
+ String roomsStr = props.getProperty("rooms",null);
+ if(props.getProperty("rooms.forceStart","no").equals("yes") &&
roomsStr != null) {
+ String[] rooms = roomsStr.split(",");
+ for(String room: rooms) {
+ try {
+ int id = Integer.parseInt(room);
+ transportMap.put(id, createSIPTransport(props, id));
+ } catch (NumberFormatException e) {
+ log.error("Room id parsing error: id=\"" + room
+ "\"");
+ }
}
+ } else {
+ this.rtmpControlClient = new
RTMPControlClient(props.getProperty("red5.host"), "openmeetings") {
+ @Override
+ protected void startRoomClient(int id) {
+ transportMap.put(id, createSIPTransport(props, id));
+ }
- @Override
- protected void stopRoomClient(int id) {
- SIPTransport t = transportMap.remove(id);
- if(t != null) {
- t.close();
+ @Override
+ protected void stopRoomClient(int id) {
+ SIPTransport t = transportMap.remove(id);
+ if(t != null) {
+ t.close();
+ }
}
- }
- };
- this.rtmpControlClient.start();
+ };
+ this.rtmpControlClient.start();
+ }
}
public void stop() throws Exception {
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java Wed
Feb 6 09:31:44 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java Sun Feb
10 12:42:20 2013
@@ -147,6 +147,7 @@
private void createPlayStream( long broadCastId ) {
log.debug( "create play stream" );
+ broadcastIds.add((int)broadCastId);
IPendingServiceCallback wrapper = new
CreatePlayStreamCallBack(broadCastId);
invoke( "createStream", null, wrapper );
}
@@ -338,6 +339,9 @@
public void newStream(Client client) {
log.debug("newStream:" + client.getBroadCastID());
+ if(broadcastIds.contains((int)client.getBroadCastID())) {
+ closeStream(client);
+ }
createPlayStream(client.getBroadCastID());
}
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
Wed Feb 6 09:31:44 2013
+++ /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
Sun Feb 10 12:42:20 2013
@@ -1,73 +1,63 @@
package org.red5.sip.app;
+import org.red5.codecs.asao.DecoderMap;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
+import static
org.red5.sip.app.RTPStreamMultiplexingSender.NELLYMOSER_ENCODED_PACKET_SIZE;
+
public class RTPStreamForMultiplex implements IMediaStream {
protected static Logger log =
Red5LoggerFactory.getLogger(RTPStreamForMultiplex.class, "sip");
private int streamId;
- private RTPStreamMultiplexingSender sender;
- private long syncSource;
- private static int BUFFERS_COUNT = 1024;
- private byte[][] buffer = new byte[BUFFERS_COUNT][65];
- private int[] bufLen = new int[BUFFERS_COUNT];
- private int start, end;
private boolean ready = false;
+ protected DecoderMap decoderMap = null;
+ private BytesBuffer buffer = new
BytesBuffer(NELLYMOSER_ENCODED_PACKET_SIZE, 200) {
+ @Override
+ protected void onBufferOverflow() {
+ super.onBufferOverflow();
+ log.error("Stream %d buffer overflow. Buffer is cleared");
+ }
- protected RTPStreamForMultiplex(int streamId, long syncSource,
RTPStreamMultiplexingSender sender) {
+ @Override
+ protected void onBufferEmpty() {
+ super.onBufferEmpty();
+ ready = false;
+ }
+ };
+
+ protected RTPStreamForMultiplex(int streamId) {
this.streamId = streamId;
- this.sender = sender;
- this.syncSource = syncSource;
- end = 0;
- start = -1;
}
public int getStreamId() {
return streamId;
}
- public synchronized void send(long timestamp, byte[] asaoBuffer, int
offset, int num) {
- if(end == start) {
- log.error("Stream buffer overflow: streamId: " + streamId + ",
start: " + start + ", end: " + end);
- return;
- }
- System.arraycopy(asaoBuffer, 0, buffer[end], 0, asaoBuffer.length);
- bufLen[end++] = num;
- if(end == BUFFERS_COUNT) {
- end = 0;
- }
- if(start == -1) {
- start = 0;
+ public void send(long timestamp, byte[] asaoBuffer, int offset, int
num) {
+ System.out.println("Stream " + streamId + " send");
+ for(int i=0;i<num;i+=NELLYMOSER_ENCODED_PACKET_SIZE) {
+ synchronized (this) {
+ buffer.push(asaoBuffer, offset+i,
NELLYMOSER_ENCODED_PACKET_SIZE);
+ }
+ Thread.yield();
}
-
- if(!ready && available() > 10) {
- ready = true;
+ synchronized (this) {
+ if(!ready && buffer.bufferUsage() > 0.2) {
+ ready = true;
+ }
}
}
-
- protected synchronized int available() {
- return (end > start) ? (end - start) : (BUFFERS_COUNT - start +
end);
- }
protected synchronized boolean ready() {
return ready;
}
+
+ protected synchronized float bufferUsage() {
+ return buffer.bufferUsage();
+ }
protected synchronized int read(byte[] dst, int offset) {
- int res = -1;
- if(start >= 0) {
- System.arraycopy(buffer[start], 0, dst, offset, dst.length);
- res = start++;
- if(start == BUFFERS_COUNT) {
- start = 0;
- }
- if(start == end) {
- start = -1;
- end = 0;
- ready = false;
- }
- }
- return res;
+ return buffer.take(dst, offset);
}
}
=======================================
---
/branches/red5sip/src/java/org/red5/sip/app/RTPStreamMultiplexingSender.java
Wed Feb 6 09:31:44 2013
+++
/branches/red5sip/src/java/org/red5/sip/app/RTPStreamMultiplexingSender.java
Sun Feb 10 12:42:20 2013
@@ -1,5 +1,6 @@
package org.red5.sip.app;
+import com.laszlosystems.libresample4j.Resampler;
import local.net.RtpPacket;
import local.net.RtpSocket;
import org.apache.mina.util.ConcurrentHashSet;
@@ -20,9 +21,36 @@
public static int RTP_HEADER_SIZE = 12;
- protected static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;
+ public static enum SAMPLE_RATE {
+ SAMPLING_8000(8000, 8),
+ SAMPLING_16000(16000, 16),
+ SAMPLING_32000(32000, 32),
+ SAMPLING_48000(48000, 48),
+ SAMPLING_11025(11025, 11),
+ SAMPLING_22050(22050, 22),
+ SAMPLING_44100(44100, 44);
- protected static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;
+ public int rate;
+ public int shortName;
+ SAMPLE_RATE(int rate, int shortName) {
+ this.rate = rate;
+ this.shortName = shortName;
+ }
+
+ public static SAMPLE_RATE findByShortName(int shortName) {
+ for(SAMPLE_RATE s: SAMPLE_RATE.values()) {
+ if(s.shortName == shortName) {
+ return s;
+ }
+ }
+ throw new IllegalArgumentException("Invalid sample rate");
+ }
+ }
+
+ protected static SAMPLE_RATE sampling = SAMPLE_RATE.SAMPLING_22050;
+
+ protected static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;// *
sampling.blocks;
+ protected static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;// *
sampling.blocks;
RtpSocket rtpSocket = null;
@@ -46,26 +74,19 @@
private int seqn = 0;
private long time = 0;
- private long syncSource = 0;
// Temporary buffer with received PCM audio from FlashPlayer.
- float[] tempBuffer;
+ private float[] tempBuffer;
+ private float[] multiplexedBuffer;
- float[] decodedBuffer1;
- float[] decodedBuffer2;
-
- volatile int multiplexingCount = 0;
+ private Resampler resampler;
+ private double factor = 1;
private Thread sendThread = new Thread(this);
ConcurrentHashSet<WeakReference<RTPStreamForMultiplex>> streamSet =
new ConcurrentHashSet<WeakReference<RTPStreamForMultiplex>>();
//Set<RTPStreamForMultiplex> streamSet =
Collections.synchronizedSet(new WeakHashSet<RTPStreamForMultiplex>());
- private static int SEND_BUFFER_MAX = 100;
- private byte[][] sendBuffer = new byte[SEND_BUFFER_MAX][];
- private int[] sendBufferLength = new int[SEND_BUFFER_MAX];
- private int sendBufferPos = 0;
-
// Floats remaining on temporary buffer.
int tempBufferRemaining = 0;
@@ -168,11 +189,6 @@
mediaReceiver.setSender(this);
this.sipCodec = sipCodec;
this.doSync = do_sync;
- this.sendBufferPos = 0;
-
- for(int i=0; i<SEND_BUFFER_MAX; i++) {
- sendBuffer[i] = new
byte[sipCodec.getOutgoingEncodedFrameSize() + RTP_HEADER_SIZE];
- }
try {
if (src_socket == null) {
@@ -194,24 +210,22 @@
seqn = 0;
time = 0;
- syncSource = 0;
println("start()", "using blocks of " + (packetBuffer.length -
RTP_HEADER_SIZE) + " bytes.");
decoder = new Decoder();
- decoderMap = null;
sendThread.start();
}
private void fillDecodedBuffer(byte[] asaoBuffer, float[] tempBuffer) {
ByteStream audioStream = new ByteStream(asaoBuffer, 1,
NELLYMOSER_ENCODED_PACKET_SIZE);
- decoderMap = decoder.decode(decoderMap, audioStream.bytes, 1,
tempBuffer, 0);
+ decoderMap = decoder.decode(decoderMap, audioStream.bytes, 0,
tempBuffer, 0);
//ResampleUtils.normalize(tempBuffer, tempBuffer.length);
}
- public synchronized IMediaStream createStream(int streamId) {
- RTPStreamForMultiplex stream = new RTPStreamForMultiplex(streamId,
syncSource, this);
+ public IMediaStream createStream(int streamId) {
+ RTPStreamForMultiplex stream = new RTPStreamForMultiplex(streamId);
streamSet.add(new WeakReference<RTPStreamForMultiplex>(stream));
return stream;
}
@@ -219,15 +233,30 @@
public void deleteStream(int streamId) {
for (Iterator<WeakReference<RTPStreamForMultiplex>> iterator =
streamSet.iterator(); iterator.hasNext(); ) {
WeakReference<RTPStreamForMultiplex> ref = iterator.next();
- if (ref.get() != null && ref.get().getStreamId() == streamId )
{
- iterator.remove();
+ RTPStreamForMultiplex stream = ref.get();
+ try {
+ if (stream != null && stream.getStreamId() == streamId ) {
+ iterator.remove();
+ }
+ } catch (NullPointerException ignored) {
+
}
}
}
- private void doRtpDelay() {
+ private void doRtpDelay(float bufferUsage) {
+ //TODO: make proper pause correction.
+ //Pause should not exceed packetization and prevent emtying of
buffer
try {
- Thread.sleep( sipCodec.getOutgoingPacketization() - 2);
+ long pause = sipCodec.getOutgoingPacketization() - 1;
+ if(bufferUsage > .5f) {
+ pause -= 5;
+ }
+ if(bufferUsage > .2f) {
+ pause -= 1;
+ }
+ System.out.println("Sleep pause: " + pause);
+ Thread.sleep( pause, 800000 );
}
catch ( Exception e ) {
}
@@ -235,48 +264,65 @@
public void run() {
if (!hasInitilializedBuffers) {
- tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
- decodedBuffer1 = new float[NELLYMOSER_DECODED_PACKET_SIZE];
- decodedBuffer2 = new float[NELLYMOSER_DECODED_PACKET_SIZE];
+ multiplexedBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
encodingBuffer = new
float[sipCodec.getOutgoingDecodedFrameSize()];
- multiplexingCount = 0;
+
+ if(sipCodec.getSampleRate() == sampling.rate) {
+ tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
+ resampler = null;
+ } else {
+ factor = sipCodec.getSampleRate() / (double)sampling.rate;
+ resampler = new Resampler(true, factor, factor);
+ tempBuffer = new
float[(int)(NELLYMOSER_DECODED_PACKET_SIZE * factor)];
+ }
hasInitilializedBuffers = true;
}
- byte[] asaoBuffer = new byte[65];
+
+ float[] decodedBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
+ byte[] asaoBuffer = new byte[NELLYMOSER_ENCODED_PACKET_SIZE];
+
+ int disableStream = 0;
+
while(rtpSocket != null) {
- multiplexingCount = 0;
- for(Iterator<WeakReference<RTPStreamForMultiplex>> i =
streamSet.iterator(); i.hasNext();) {
- int len = -1;
- WeakReference<RTPStreamForMultiplex> ref = i.next();
- RTPStreamForMultiplex stream = ref.get();
- if(stream != null) {
- synchronized (stream) {
- System.out.println(String.format("Stream id %d,
avail %d", stream.getStreamId(), stream.available()));
- if(stream != null && stream.ready()) {
+ float bufferUsage = 0;
+ int multiplexingCount = 0;
+ try {
+ for(Iterator<WeakReference<RTPStreamForMultiplex>> i =
streamSet.iterator(); i.hasNext();) {
+ int len = -1;
+ WeakReference<RTPStreamForMultiplex> ref = i.next();
+ RTPStreamForMultiplex stream = ref.get();
+ if(stream != null) {
+ if(stream.ready() && disableStream !=
stream.getStreamId()) {
len = stream.read(asaoBuffer, 0);
+ bufferUsage = Math.max(bufferUsage,
stream.bufferUsage());
+ System.out.println(String.format("Stream
id %d, buffer %f", stream.getStreamId(), stream.bufferUsage()));
} else {
- break;
+ continue;
}
+ } else {
+ i.remove();
+ continue;
}
- } else {
- i.remove();
- break;
- }
- if(len != -1) {
- fillDecodedBuffer(asaoBuffer, decodedBuffer1);
- if (multiplexingCount > 0) {
- ResampleUtils.multiplex(decodedBuffer2,
decodedBuffer1);
- } else {
- BufferUtils.floatBufferIndexedCopy(decodedBuffer2,
0, decodedBuffer1, 0, decodedBuffer1.length);
+ if(len != -1) {
+ ByteStream audioStream = new
ByteStream(asaoBuffer, 1, NELLYMOSER_ENCODED_PACKET_SIZE);
+ stream.decoderMap =
decoder.decode(stream.decoderMap, audioStream.bytes, 0, decodedBuffer, 0);
+ //fillDecodedBuffer(asaoBuffer, decodedBuffer);
+ if (multiplexingCount > 0) {
+ ResampleUtils.multiplex(multiplexedBuffer,
decodedBuffer);
+ } else {
+ System.arraycopy(decodedBuffer, 0,
multiplexedBuffer, 0, decodedBuffer.length);
+ }
+ multiplexingCount++;
}
- multiplexingCount++;
- //System.out.println("Multiplex stream: " + streamId);
+ Thread.yield();
}
- Thread.yield();
+ } catch (Exception e) {
+ log.error("Exception", e);
}
if(multiplexingCount > 0) {
- //System.out.println("Send: multiplexed: " +
multiplexingCount + ", total streams: " + streamSet.size());
- ResampleUtils.normalize(decodedBuffer2,
decodedBuffer2.length);
+ System.out.println("Send: multiplexed: " +
multiplexingCount + ", total streams: " + streamSet.size());
+// ResampleUtils.normalize(multiplexedBuffer,
1.0f/multiplexingCount);
+ ResampleUtils.normalize(multiplexedBuffer,
multiplexedBuffer.length);
try {
asao_buffer_processed = false;
do {
@@ -286,7 +332,8 @@
}
if (encodingOffset ==
sipCodec.getOutgoingDecodedFrameSize()) {
rtpSocketSend(rtpPacket);
- doRtpDelay();
+ doRtpDelay(bufferUsage);
+ //System.out.println("rtpSocketSend,
bufferUsage: " + bufferUsage);
encodingOffset = 0;
}
}
@@ -339,10 +386,13 @@
}
- // Decode new asao packet.
-
+ // Process next buffer
asao_buffer_processed = true;
- BufferUtils.floatBufferIndexedCopy(tempBuffer, 0,
decodedBuffer2, 0, decodedBuffer2.length);
+ if(resampler == null) {
+ System.arraycopy(multiplexedBuffer, 0, tempBuffer, 0,
tempBuffer.length);
+ } else {
+ resampler.process(factor, multiplexedBuffer, 0,
multiplexedBuffer.length, true, tempBuffer, 0, tempBuffer.length);
+ }
// tempBuffer = ResampleUtils.normalize(tempBuffer, 256);
// normalise volume
tempBufferRemaining = tempBuffer.length;
@@ -404,8 +454,6 @@
println("halt", "Terminated");
}
-
- private long lastMS = System.currentTimeMillis();
private void rtpSocketSend(RtpPacket rtpPacket) {
try {